This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa40ad6   renaming sinks and sources api to be consistent with the 
rest of Pulsar (#4363)
aa40ad6 is described below

commit aa40ad6ddeb4208d439df2c9080fc73b1e6ffa3b
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sun May 26 21:48:16 2019 -0700

     renaming sinks and sources api to be consistent with the rest of Pulsar 
(#4363)
    
    * Rename sources and sinks CLI to be consistent with rest of Pulsar
    
    * renaming sinks and sources api to be consistent with the rest of Pulsar
    
    * use new interfaces in cmd
---
 pulsar-broker/pom.xml                              |   4 +-
 .../admin/impl/{SinkBase.java => SinksBase.java}   |  10 +-
 .../impl/{SourceBase.java => SourcesBase.java}     |  12 +-
 .../org/apache/pulsar/broker/admin/v3/Sink.java    |   8 +-
 .../broker/admin/v3/{Sink.java => Sinks.java}      |   8 +-
 .../org/apache/pulsar/broker/admin/v3/Source.java  |   8 +-
 .../broker/admin/v3/{Source.java => Sources.java}  |   8 +-
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  38 ++-
 .../java/org/apache/pulsar/client/admin/Sink.java  | 329 +-------------------
 .../pulsar/client/admin/{Sink.java => Sinks.java}  |   2 +-
 .../org/apache/pulsar/client/admin/Source.java     | 330 +--------------------
 .../client/admin/{Source.java => Sources.java}     |   2 +-
 .../internal/{SinkImpl.java => SinksImpl.java}     |   5 +-
 .../internal/{SourceImpl.java => SourcesImpl.java} |   5 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  36 +--
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  36 +--
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |  29 +-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |   8 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |   8 +-
 .../pulsar/functions/worker/rest/Resources.java    |  12 +-
 .../rest/api/{SinkImpl.java => SinksImpl.java}     |   4 +-
 .../rest/api/{SourceImpl.java => SourcesImpl.java} |   4 +-
 ...V2Resource.java => FunctionsApiV2Resource.java} |   4 +-
 ...V3Resource.java => FunctionsApiV3Resource.java} |   4 +-
 .../worker/rest/api/v3/SinkApiV3Resource.java      | 224 +-------------
 ...kApiV3Resource.java => SinksApiV3Resource.java} |  16 +-
 .../worker/rest/api/v3/SourceApiV3Resource.java    | 228 +-------------
 ...piV3Resource.java => SourcesApiV3Resource.java} |  28 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |   2 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |   5 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |   8 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |   8 +-
 32 files changed, 224 insertions(+), 1209 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index ac63f00..c9f7e36 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -404,7 +404,7 @@
                 </apiSource>
                 <apiSource>
                   <springmvc>false</springmvc>
-                  
<locations>org.apache.pulsar.broker.admin.v3.Source</locations>
+                  
<locations>org.apache.pulsar.broker.admin.v3.Sources</locations>
                   <schemes>http,https</schemes>
                   <basePath>/admin/v3</basePath>
                   <info>
@@ -421,7 +421,7 @@
                 </apiSource>
                 <apiSource>
                   <springmvc>false</springmvc>
-                  <locations>org.apache.pulsar.broker.admin.v3.Sink</locations>
+                  
<locations>org.apache.pulsar.broker.admin.v3.Sinks</locations>
                   <schemes>http,https</schemes>
                   <basePath>/admin/v3</basePath>
                   <info>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
similarity index 98%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index f15003c..50839d8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
+import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -47,12 +47,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Supplier;
 
-public class SinkBase extends AdminResource implements Supplier<WorkerService> 
{
+public class SinksBase extends AdminResource implements 
Supplier<WorkerService> {
 
-    private final SinkImpl sink;
+    private final SinksImpl sink;
 
-    public SinkBase() {
-        this.sink = new SinkImpl(this);
+    public SinksBase() {
+        this.sink = new SinksImpl(this);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
similarity index 97%
rename from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
rename to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index f55337e..e65eda7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
+import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -47,12 +47,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Supplier;
 
-public class SourceBase extends AdminResource implements 
Supplier<WorkerService> {
+public class SourcesBase extends AdminResource implements 
Supplier<WorkerService> {
 
-    private final SourceImpl source;
+    private final SourcesImpl source;
 
-    public SourceBase() {
-        this.source = new SourceImpl(this);
+    public SourcesBase() {
+        this.source = new SourcesImpl(this);
     }
 
     @Override
@@ -188,6 +188,7 @@ public class SourceBase extends AdminResource implements 
Supplier<WorkerService>
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions")
     })
+    @Consumes(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}")
     public List<String> listSources(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String 
namespace) {
@@ -291,6 +292,7 @@ public class SourceBase extends AdminResource implements 
Supplier<WorkerService>
             @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 408, message = "Request timeout")
     })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/builtinsources")
     public List<ConnectorDefinition> getSourceList() {
         List<ConnectorDefinition> connectorDefinitions = 
source.getListOfConnectors();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
index 9191d8c..667e88e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
-import org.apache.pulsar.broker.admin.impl.SinkBase;
+import org.apache.pulsar.broker.admin.impl.SinksBase;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.Path;
@@ -30,5 +30,9 @@ import javax.ws.rs.core.MediaType;
 @Api(value = "/sink", description = "Sink admin apis", tags = "sink")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class Sink extends SinkBase {
+@Deprecated
+/**
+ * @deprecated in favor of {@link Sinks}
+ */
+public class Sink extends SinksBase {
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java
similarity index 85%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java
index 9191d8c..a693ea2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sinks.java
@@ -19,16 +19,16 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
-import org.apache.pulsar.broker.admin.impl.SinkBase;
+import org.apache.pulsar.broker.admin.impl.SinksBase;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 
-@Path("/sink")
-@Api(value = "/sink", description = "Sink admin apis", tags = "sink")
+@Path("/sinks")
+@Api(value = "/sinks", description = "Sinks admin apis", tags = "sinks")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class Sink extends SinkBase {
+public class Sinks extends SinksBase {
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
index b8acd19..f68d228 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
-import org.apache.pulsar.broker.admin.impl.SourceBase;
+import org.apache.pulsar.broker.admin.impl.SourcesBase;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.Path;
@@ -30,5 +30,9 @@ import javax.ws.rs.core.MediaType;
 @Api(value = "/source", description = "Source admin apis", tags = "source")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class Source extends SourceBase {
+@Deprecated
+/**
+ * @deprecated in favor of {@link Sources}
+ */
+public class Source extends SourcesBase {
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java
similarity index 84%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java
index b8acd19..a36ab08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sources.java
@@ -19,16 +19,16 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import io.swagger.annotations.Api;
-import org.apache.pulsar.broker.admin.impl.SourceBase;
+import org.apache.pulsar.broker.admin.impl.SourcesBase;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 
-@Path("/source")
-@Api(value = "/source", description = "Source admin apis", tags = "source")
+@Path("/sources")
+@Api(value = "/sources", description = "Sources admin apis", tags = "sources")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
-public class Source extends SourceBase {
+public class Sources extends SourcesBase {
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 9ab3d6d..2f182f7 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -40,8 +40,8 @@ import 
org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
 import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
 import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
 import org.apache.pulsar.client.admin.internal.SchemasImpl;
-import org.apache.pulsar.client.admin.internal.SinkImpl;
-import org.apache.pulsar.client.admin.internal.SourceImpl;
+import org.apache.pulsar.client.admin.internal.SinksImpl;
+import org.apache.pulsar.client.admin.internal.SourcesImpl;
 import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.admin.internal.WorkerImpl;
@@ -87,8 +87,8 @@ public class PulsarAdmin implements Closeable {
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
-    private final Source source;
-    private final Sink sink;
+    private final Sources sources;
+    private final Sinks sinks;
     private final Worker worker;
     private final Schemas schemas;
     protected final WebTarget root;
@@ -193,8 +193,8 @@ public class PulsarAdmin implements Closeable {
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
         this.functions = new FunctionsImpl(root, auth, httpAsyncClient);
-        this.source = new SourceImpl(root, auth, httpAsyncClient);
-        this.sink = new SinkImpl(root, auth, httpAsyncClient);
+        this.sources = new SourcesImpl(root, auth, httpAsyncClient);
+        this.sinks = new SinksImpl(root, auth, httpAsyncClient);
         this.worker = new WorkerImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
         this.bookies = new BookiesImpl(root, auth);
@@ -340,23 +340,35 @@ public class PulsarAdmin implements Closeable {
     }
 
     /**
-     *
-     * @return the source management object
+     * @return the sources management object
+     * @deprecated in favor of {@link #sources()}
      */
+    @Deprecated
     public Source source() {
-        return source;
+        return (Source) sources;
+    }
+
+    public Sources sources() {
+        return sources;
     }
 
     /**
-     *
-     * @return the sink management object
+     * @return the sinks management object
+     * @deprecated in favor of {@link #sinks}
      */
+    @Deprecated
     public Sink sink() {
-        return sink;
+        return (Sink) sinks;
+    }
+
+    /**
+     * @return the sinks management object
+     */
+    public Sinks sinks() {
+        return sinks;
     }
 
     /**
-    *
     * @return the Worker stats
     */
    public Worker worker() {
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
index 8db53aa..c270a75 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -18,333 +18,10 @@
  */
 package org.apache.pulsar.client.admin;
 
-import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
-import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
-import org.apache.pulsar.common.functions.UpdateOptions;
-import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.policies.data.SinkStatus;
-import org.apache.pulsar.common.io.SinkConfig;
-
-import java.util.List;
-
 /**
- * Admin interface for Sink management.
+ * @deprecated in favor of {@link Sinks}
  */
-public interface Sink {
-    /**
-     * Get the list of sinks.
-     * <p>
-     * Get the list of all the Pulsar Sinks.
-     * <p>
-     * Response Example:
-     *
-     * <pre>
-     * <code>["f1", "f2", "f3"]</code>
-     * </pre>
-     *
-     * @throws NotAuthorizedException
-     *             Don't have admin permission
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    List<String> listSinks(String tenant, String namespace) throws 
PulsarAdminException;
-
-    /**
-     * Get the configuration for the specified sink.
-     * <p>
-     * Response Example:
-     *
-     * <pre>
-     * <code>{ serviceUrl : "http://my-broker.example.com:8080/"; }</code>
-     * </pre>
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @return the sink configuration
-     *
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to get the configuration of 
the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    SinkConfig getSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
-
-    /**
-     * Create a new sink.
-     *
-     * @param sinkConfig
-     *            the sink configuration object
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void createSink(SinkConfig sinkConfig, String fileName) throws 
PulsarAdminException;
-
-    /**
-     * <pre>
-     * Create a new sink by providing url from which fun-pkg can be 
downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * </pre>
-     *
-     * @param sinkConfig
-     *            the sink configuration object
-     * @param pkgUrl
-     *            url from which pkg can be downloaded
-     * @throws PulsarAdminException
-     */
-    void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws 
PulsarAdminException;
-
-    /**
-     * Update the configuration for a sink.
-     * <p>
-     *
-     * @param sinkConfig
-     *            the sink configuration object
-     *
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSink(SinkConfig sinkConfig, String fileName) throws 
PulsarAdminException;
-
-    /**
-     * Update the configuration for a sink.
-     * <p>
-     *
-     * @param sinkConfig
-     *            the sink configuration object
-     * @param updateOptions
-     *            options for the update operations
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSink(SinkConfig sinkConfig, String fileName, UpdateOptions 
updateOptions) throws PulsarAdminException;
-
-    /**
-     * Update the configuration for a sink.
-     * <pre>
-     * Update a sink by providing url from which fun-pkg can be downloaded. 
supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * </pre>
-     *
-     * @param sinkConfig
-     *            the sink configuration object
-     * @param pkgUrl
-     *            url from which pkg can be downloaded
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws 
PulsarAdminException;
-
-    /**
-     * Update the configuration for a sink.
-     * <pre>
-     * Update a sink by providing url from which fun-pkg can be downloaded. 
supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * </pre>
-     *
-     * @param sinkConfig
-     *            the sink configuration object
-     * @param pkgUrl
-     *            url from which pkg can be downloaded
-     * @param updateOptions
-     *            options for the update operations
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl, UpdateOptions 
updateOptions) throws PulsarAdminException;
-
-    /**
-     * Delete an existing sink
-     * <p>
-     * Delete a sink
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @throws NotAuthorizedException
-     *             You don't have admin permission
-     * @throws NotFoundException
-     *             Cluster does not exist
-     * @throws PreconditionFailedException
-     *             Cluster is not empty
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void deleteSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
-
-    /**
-     * Gets the current status of a sink.
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    SinkStatus getSinkStatus(String tenant, String namespace, String sink) 
throws PulsarAdminException;
-
-    /**
-     * Gets the current status of a sink instance.
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     * @param id
-     *            Sink instance-id
-     * @return
-     * @throws PulsarAdminException
-     */
-    SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String 
tenant, String namespace, String sink, int id)
-            throws PulsarAdminException;
-
-    /**
-     * Restart sink instance
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @param instanceId
-     *            Sink instanceId
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void restartSink(String tenant, String namespace, String sink, int 
instanceId) throws PulsarAdminException;
-
-    /**
-     * Restart all sink instances
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void restartSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
-
-
-    /**
-     * Stop sink instance
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @param instanceId
-     *            Sink instanceId
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void stopSink(String tenant, String namespace, String sink, int 
instanceId) throws PulsarAdminException;
-
-    /**
-     * Stop all sink instances
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void stopSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
-
-    /**
-     * Start sink instance
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @param instanceId
-     *            Sink instanceId
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void startSink(String tenant, String namespace, String sink, int 
instanceId) throws PulsarAdminException;
-
-    /**
-     * Start all sink instances
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param sink
-     *            Sink name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void startSink(String tenant, String namespace, String sink) throws 
PulsarAdminException;
-
+@Deprecated
+public interface Sink extends Sinks {
 
-    /**
-     * Fetches a list of supported Pulsar IO sinks currently running in 
cluster mode
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     *
-     */
-    List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sinks.java
similarity index 99%
copy from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
copy to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sinks.java
index 8db53aa..afa44e0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sinks.java
@@ -31,7 +31,7 @@ import java.util.List;
 /**
  * Admin interface for Sink management.
  */
-public interface Sink {
+public interface Sinks {
     /**
      * Get the list of sinks.
      * <p>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
index 058364e..aba3251 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -18,333 +18,9 @@
  */
 package org.apache.pulsar.client.admin;
 
-import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
-import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
-import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
-import org.apache.pulsar.common.functions.UpdateOptions;
-import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.common.policies.data.SourceStatus;
-
-import java.util.List;
-
 /**
- * Admin interface for Source management.
+ * @deprecated in favor of {@link Sources}
  */
-public interface Source {
-    /**
-     * Get the list of sources.
-     * <p>
-     * Get the list of all the Pulsar Sources.
-     * <p>
-     * Response Example:
-     *
-     * <pre>
-     * <code>["f1", "f2", "f3"]</code>
-     * </pre>
-     *
-     * @throws NotAuthorizedException
-     *             Don't have admin permission
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    List<String> listSources(String tenant, String namespace) throws 
PulsarAdminException;
-
-    /**
-     * Get the configuration for the specified source.
-     * <p>
-     * Response Example:
-     *
-     * <pre>
-     * <code>{ serviceUrl : "http://my-broker.example.com:8080/"; }</code>
-     * </pre>
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @return the source configuration
-     *
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to get the configuration of 
the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    SourceConfig getSource(String tenant, String namespace, String source) 
throws PulsarAdminException;
-
-    /**
-     * Create a new source.
-     *
-     * @param sourceConfig
-     *            the source configuration object
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void createSource(SourceConfig sourceConfig, String fileName) throws 
PulsarAdminException;
-
-    /**
-     * <pre>
-     * Create a new source by providing url from which fun-pkg can be 
downloaded. supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * </pre>
-     *
-     * @param sourceConfig
-     *            the source configuration object
-     * @param pkgUrl
-     *            url from which pkg can be downloaded
-     * @throws PulsarAdminException
-     */
-    void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws 
PulsarAdminException;
-
-    /**
-     * Update the configuration for a source.
-     * <p>
-     *
-     * @param sourceConfig
-     *            the source configuration object
-     *
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSource(SourceConfig sourceConfig, String fileName) throws 
PulsarAdminException;
-
-    /**
-     * Update the configuration for a source.
-     * <p>
-     *
-     * @param sourceConfig
-     *            the source configuration object
-     * @param updateOptions
-     *            options for the update operations
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSource(SourceConfig sourceConfig, String fileName, 
UpdateOptions updateOptions) throws PulsarAdminException;
-
-    /**
-     * Update the configuration for a source.
-     * <pre>
-     * Update a source by providing url from which fun-pkg can be downloaded. 
supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * </pre>
-     *
-     * @param sourceConfig
-     *            the source configuration object
-     * @param pkgUrl
-     *            url from which pkg can be downloaded
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws 
PulsarAdminException;
-
-    /**
-     * Update the configuration for a source.
-     * <pre>
-     * Update a source by providing url from which fun-pkg can be downloaded. 
supported url: http/file
-     * eg:
-     * File: file:/dir/fileName.jar
-     * Http: http://www.repo.com/fileName.jar
-     * </pre>
-     *
-     * @param sourceConfig
-     *            the source configuration object
-     * @param pkgUrl
-     *            url from which pkg can be downloaded
-     * @param updateOptions
-     *            options for the update operations
-     * @throws NotAuthorizedException
-     *             You don't have admin permission to create the cluster
-     * @throws NotFoundException
-     *             Cluster doesn't exist
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl, 
UpdateOptions updateOptions) throws PulsarAdminException;
-
-    /**
-     * Delete an existing source
-     * <p>
-     * Delete a source
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @throws NotAuthorizedException
-     *             You don't have admin permission
-     * @throws NotFoundException
-     *             Cluster does not exist
-     * @throws PreconditionFailedException
-     *             Cluster is not empty
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void deleteSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
-
-    /**
-     * Gets the current status of a source.
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    SourceStatus getSourceStatus(String tenant, String namespace, String 
source) throws PulsarAdminException;
-
-    /**
-     * Gets the current status of a source instance.
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     * @param id
-     *            Source instance-id
-     * @return
-     * @throws PulsarAdminException
-     */
-    SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceStatus(String tenant, String namespace, String source, int id)
-            throws PulsarAdminException;
-
-    /**
-     * Restart source instance
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @param instanceId
-     *            Source instanceId
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void restartSource(String tenant, String namespace, String source, int 
instanceId) throws PulsarAdminException;
-
-    /**
-     * Restart all source instances
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void restartSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
-
-
-    /**
-     * Stop source instance
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @param instanceId
-     *            Source instanceId
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void stopSource(String tenant, String namespace, String source, int 
instanceId) throws PulsarAdminException;
-
-    /**
-     * Stop all source instances
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void stopSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
-
-    /**
-     * Start source instance
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @param instanceId
-     *            Source instanceId
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void startSource(String tenant, String namespace, String source, int 
instanceId) throws PulsarAdminException;
-
-    /**
-     * Start all source instances
-     *
-     * @param tenant
-     *            Tenant name
-     * @param namespace
-     *            Namespace name
-     * @param source
-     *            Source name
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     */
-    void startSource(String tenant, String namespace, String source) throws 
PulsarAdminException;
-
-
-    /**
-     * Fetches a list of supported Pulsar IO sources currently running in 
cluster mode
-     *
-     * @throws PulsarAdminException
-     *             Unexpected error
-     *
-     */
-    List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException;
+@Deprecated
+public interface Source extends Sources {
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sources.java
similarity index 99%
copy from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
copy to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sources.java
index 058364e..ef108e2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sources.java
@@ -31,7 +31,7 @@ import java.util.List;
 /**
  * Admin interface for Source management.
  */
-public interface Source {
+public interface Sources {
     /**
      * Get the list of sources.
      * <p>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
similarity index 98%
rename from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
rename to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 3da82ce..77a25cc 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Sink;
+import org.apache.pulsar.client.admin.Sinks;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -48,12 +49,12 @@ import static org.asynchttpclient.Dsl.post;
 import static org.asynchttpclient.Dsl.put;
 
 @Slf4j
-public class SinkImpl extends ComponentResource implements Sink {
+public class SinksImpl extends ComponentResource implements Sinks, Sink {
 
     private final WebTarget sink;
     private final AsyncHttpClient asyncHttpClient;
 
-    public SinkImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient) {
+    public SinksImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient) {
         super(auth);
         this.sink = web.path("/admin/v3/sink");
         this.asyncHttpClient = asyncHttpClient;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
similarity index 98%
rename from 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
rename to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index cd7b838..d8f57a3 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Source;
+import org.apache.pulsar.client.admin.Sources;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -48,12 +49,12 @@ import static org.asynchttpclient.Dsl.post;
 import static org.asynchttpclient.Dsl.put;
 
 @Slf4j
-public class SourceImpl extends ComponentResource implements Source {
+public class SourcesImpl extends ComponentResource implements Sources, Source {
 
     private final WebTarget source;
     private final AsyncHttpClient asyncHttpClient;
 
-    public SourceImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient) {
+    public SourcesImpl(WebTarget web, Authentication auth, AsyncHttpClient 
asyncHttpClient) {
         super(auth);
         this.source = web.path("/admin/v3/source");
         this.asyncHttpClient = asyncHttpClient;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 0f2cecb..b6029a9 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -70,7 +70,7 @@ public class CmdSinks extends CmdBase {
     private final LocalSinkRunner localSinkRunner;
 
     public CmdSinks(PulsarAdmin admin) {
-        super("sink", admin);
+        super("sinks", admin);
         createSink = new CreateSink();
         updateSink = new UpdateSink();
         deleteSink = new DeleteSink();
@@ -202,9 +202,9 @@ public class CmdSinks extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                admin.sink().createSinkWithUrl(sinkConfig, 
sinkConfig.getArchive());
+                admin.sinks().createSinkWithUrl(sinkConfig, 
sinkConfig.getArchive());
             } else {
-                admin.sink().createSink(sinkConfig, sinkConfig.getArchive());
+                admin.sinks().createSink(sinkConfig, sinkConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -221,9 +221,9 @@ public class CmdSinks extends CmdBase {
             UpdateOptions updateOptions = new UpdateOptions();
             updateOptions.setUpdateAuthData(updateAuthData);
             if (Utils.isFunctionPackageUrlSupported(archive)) {
-                admin.sink().updateSinkWithUrl(sinkConfig, 
sinkConfig.getArchive(), updateOptions);
+                admin.sinks().updateSinkWithUrl(sinkConfig, 
sinkConfig.getArchive(), updateOptions);
             } else {
-                admin.sink().updateSink(sinkConfig, sinkConfig.getArchive(), 
updateOptions);
+                admin.sinks().updateSink(sinkConfig, sinkConfig.getArchive(), 
updateOptions);
             }
             print("Updated successfully");
         }
@@ -465,7 +465,7 @@ public class CmdSinks extends CmdBase {
         protected String validateSinkType(String sinkType) throws IOException {
             Set<String> availableSinks;
             try {
-                availableSinks = 
admin.sink().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
+                availableSinks = 
admin.sinks().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
             } catch (PulsarAdminException e) {
                 throw new IOException(e);
             }
@@ -515,7 +515,7 @@ public class CmdSinks extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            admin.sink().deleteSink(tenant, namespace, sinkName);
+            admin.sinks().deleteSink(tenant, namespace, sinkName);
             print("Deleted successfully");
         }
     }
@@ -525,7 +525,7 @@ public class CmdSinks extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            SinkConfig sinkConfig = admin.sink().getSink(tenant, namespace, 
sinkName);
+            SinkConfig sinkConfig = admin.sinks().getSink(tenant, namespace, 
sinkName);
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
             System.out.println(gson.toJson(sinkConfig));
         }
@@ -554,7 +554,7 @@ public class CmdSinks extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            List<String> sinks = admin.sink().listSinks(tenant, namespace);
+            List<String> sinks = admin.sinks().listSinks(tenant, namespace);
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
             System.out.println(gson.toJson(sinks));
         }
@@ -569,9 +569,9 @@ public class CmdSinks extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if (isBlank(instanceId)) {
-                print(admin.sink().getSinkStatus(tenant, namespace, sinkName));
+                print(admin.sinks().getSinkStatus(tenant, namespace, 
sinkName));
             } else {
-                print(admin.sink().getSinkStatus(tenant, namespace, sinkName, 
Integer.parseInt(instanceId)));
+                print(admin.sinks().getSinkStatus(tenant, namespace, sinkName, 
Integer.parseInt(instanceId)));
             }
         }
     }
@@ -586,12 +586,12 @@ public class CmdSinks extends CmdBase {
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
                 try {
-                    admin.sink().restartSink(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
+                    admin.sinks().restartSink(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
                 } catch (NumberFormatException e) {
                     System.err.println("instance-id must be a number");
                 }
             } else {
-                admin.sink().restartSink(tenant, namespace, sinkName);
+                admin.sinks().restartSink(tenant, namespace, sinkName);
             }
             System.out.println("Restarted successfully");
         }
@@ -607,12 +607,12 @@ public class CmdSinks extends CmdBase {
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
                 try {
-                    admin.sink().stopSink(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
+                    admin.sinks().stopSink(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
                 } catch (NumberFormatException e) {
                     System.err.println("instance-id must be a number");
                 }
             } else {
-                admin.sink().stopSink(tenant, namespace, sinkName);
+                admin.sinks().stopSink(tenant, namespace, sinkName);
             }
             System.out.println("Stopped successfully");
         }
@@ -628,12 +628,12 @@ public class CmdSinks extends CmdBase {
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
                 try {
-                    admin.sink().startSink(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
+                    admin.sinks().startSink(tenant, namespace, sinkName, 
Integer.parseInt(instanceId));
                 } catch (NumberFormatException e) {
                     System.err.println("instance-id must be a number");
                 }
             } else {
-                admin.sink().startSink(tenant, namespace, sinkName);
+                admin.sinks().startSink(tenant, namespace, sinkName);
             }
             System.out.println("Started successfully");
         }
@@ -643,7 +643,7 @@ public class CmdSinks extends CmdBase {
     public class ListBuiltInSinks extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.sink().getBuiltInSinks().stream().filter(x -> 
isNotBlank(x.getSinkClass()))
+            admin.sinks().getBuiltInSinks().stream().filter(x -> 
isNotBlank(x.getSinkClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         
System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 782b0f6..535bb33 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -73,7 +73,7 @@ public class CmdSources extends CmdBase {
     private final LocalSourceRunner localSourceRunner;
 
     public CmdSources(PulsarAdmin admin) {
-        super("source", admin);
+        super("sources", admin);
         createSource = new CreateSource();
         updateSource = new UpdateSource();
         deleteSource = new DeleteSource();
@@ -206,9 +206,9 @@ public class CmdSources extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if 
(Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
-                admin.source().createSourceWithUrl(sourceConfig, 
sourceConfig.getArchive());
+                admin.sources().createSourceWithUrl(sourceConfig, 
sourceConfig.getArchive());
             } else {
-                admin.source().createSource(sourceConfig, 
sourceConfig.getArchive());
+                admin.sources().createSource(sourceConfig, 
sourceConfig.getArchive());
             }
             print("Created successfully");
         }
@@ -225,9 +225,9 @@ public class CmdSources extends CmdBase {
             UpdateOptions updateOptions = new UpdateOptions();
             updateOptions.setUpdateAuthData(updateAuthData);
             if 
(Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
-                admin.source().updateSourceWithUrl(sourceConfig, 
sourceConfig.getArchive(), updateOptions);
+                admin.sources().updateSourceWithUrl(sourceConfig, 
sourceConfig.getArchive(), updateOptions);
             } else {
-                admin.source().updateSource(sourceConfig, 
sourceConfig.getArchive(), updateOptions);
+                admin.sources().updateSource(sourceConfig, 
sourceConfig.getArchive(), updateOptions);
             }
             print("Updated successfully");
         }
@@ -419,7 +419,7 @@ public class CmdSources extends CmdBase {
         protected String validateSourceType(String sourceType) throws 
IOException {
             Set<String> availableSources;
             try {
-                availableSources = 
admin.source().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
+                availableSources = 
admin.sources().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
             } catch (PulsarAdminException e) {
                 throw new IOException(e);
             }
@@ -469,7 +469,7 @@ public class CmdSources extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            admin.source().deleteSource(tenant, namespace, sourceName);
+            admin.sources().deleteSource(tenant, namespace, sourceName);
             print("Delete source successfully");
         }
     }
@@ -479,7 +479,7 @@ public class CmdSources extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            SourceConfig sourceConfig = admin.source().getSource(tenant, 
namespace, sourceName);
+            SourceConfig sourceConfig = admin.sources().getSource(tenant, 
namespace, sourceName);
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
             System.out.println(gson.toJson(sourceConfig));
         }
@@ -508,7 +508,7 @@ public class CmdSources extends CmdBase {
 
         @Override
         void runCmd() throws Exception {
-            List<String> sources = admin.source().listSources(tenant, 
namespace);
+            List<String> sources = admin.sources().listSources(tenant, 
namespace);
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
             System.out.println(gson.toJson(sources));
         }
@@ -523,9 +523,9 @@ public class CmdSources extends CmdBase {
         @Override
         void runCmd() throws Exception {
             if (isBlank(instanceId)) {
-                print(admin.source().getSourceStatus(tenant, namespace, 
sourceName));
+                print(admin.sources().getSourceStatus(tenant, namespace, 
sourceName));
             } else {
-                print(admin.source().getSourceStatus(tenant, namespace, 
sourceName, Integer.parseInt(instanceId)));
+                print(admin.sources().getSourceStatus(tenant, namespace, 
sourceName, Integer.parseInt(instanceId)));
             };
         }
     }
@@ -540,12 +540,12 @@ public class CmdSources extends CmdBase {
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
                 try {
-                    admin.source().restartSource(tenant, namespace, 
sourceName, Integer.parseInt(instanceId));
+                    admin.sources().restartSource(tenant, namespace, 
sourceName, Integer.parseInt(instanceId));
                 } catch (NumberFormatException e) {
                     System.err.println("instance-id must be a number");
                 }
             } else {
-                admin.source().restartSource(tenant, namespace, sourceName);
+                admin.sources().restartSource(tenant, namespace, sourceName);
             }
             System.out.println("Restarted successfully");
         }
@@ -561,12 +561,12 @@ public class CmdSources extends CmdBase {
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
                 try {
-                    admin.source().stopSource(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
+                    admin.sources().stopSource(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
                 } catch (NumberFormatException e) {
                     System.err.println("instance-id must be a number");
                 }
             } else {
-                admin.source().stopSource(tenant, namespace, sourceName);
+                admin.sources().stopSource(tenant, namespace, sourceName);
             }
             System.out.println("Stopped successfully");
         }
@@ -582,12 +582,12 @@ public class CmdSources extends CmdBase {
         void runCmd() throws Exception {
             if (isNotBlank(instanceId)) {
                 try {
-                    admin.source().startSource(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
+                    admin.sources().startSource(tenant, namespace, sourceName, 
Integer.parseInt(instanceId));
                 } catch (NumberFormatException e) {
                     System.err.println("instance-id must be a number");
                 }
             } else {
-                admin.source().startSource(tenant, namespace, sourceName);
+                admin.sources().startSource(tenant, namespace, sourceName);
             }
             System.out.println("Started successfully");
         }
@@ -597,7 +597,7 @@ public class CmdSources extends CmdBase {
     public class ListBuiltInSources extends BaseCommand {
         @Override
         void runCmd() throws Exception {
-            admin.source().getBuiltInSources().stream().filter(x -> 
!StringUtils.isEmpty(x.getSourceClass()))
+            admin.sources().getBuiltInSources().stream().filter(x -> 
!StringUtils.isEmpty(x.getSourceClass()))
                     .forEach(connector -> {
                         System.out.println(connector.getName());
                         
System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 705a386..c3a7fd9 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -108,6 +108,11 @@ public class PulsarAdminTool {
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         commandMap.put("functions", CmdFunctions.class);
         commandMap.put("functions-worker", CmdFunctionWorker.class);
+        commandMap.put("sources", CmdSources.class);
+        commandMap.put("sinks", CmdSinks.class);
+
+        // To remain backwards compatibility for "source" and "sink" commands
+        // TODO eventually remove this
         commandMap.put("source", CmdSources.class);
         commandMap.put("sink", CmdSinks.class);
     }
@@ -119,11 +124,16 @@ public class PulsarAdminTool {
             PulsarAdmin admin = adminFactory.apply(adminBuilder);
             for (Map.Entry<String, Class<?>> c : commandMap.entrySet()) {
                 if (admin != null) {
-                    // Other mode, all components are initialized.
-                    jcommander.addCommand(c.getKey(), 
c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
-                } else if (c.getKey().equals("functions") || 
c.getKey().equals("source") || c.getKey().equals("sink")) {
-                    // In mode localrun, only some components are initialized, 
such as source, sink and functions
-                    jcommander.addCommand(c.getKey(), 
c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
+                    // To remain backwards compatibility for "source" and 
"sink" commands
+                    // TODO eventually remove this
+                    if (c.getKey().equals("sources") || 
c.getKey().equals("source")) {
+                        jcommander.addCommand("sources", 
c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "source");
+                    } else if (c.getKey().equals("sinks") || 
c.getKey().equals("sink")) {
+                        jcommander.addCommand("sinks", 
c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin), "sink");
+                    } else {
+                        // Other mode, all components are initialized.
+                        jcommander.addCommand(c.getKey(), 
c.getValue().getConstructor(PulsarAdmin.class).newInstance(admin));
+                    }
                 }
             }
         } catch (Exception e) {
@@ -187,6 +197,15 @@ public class PulsarAdminTool {
         } else {
             setupCommands(adminFactory);
             String cmd = args[cmdPos];
+
+            // To remain backwards compatibility for "source" and "sink" 
commands
+            // TODO eventually remove this
+            if (cmd.equals("source")) {
+                cmd = "sources";
+            } else if (cmd.equals("sink")) {
+                cmd = "sinks";
+            }
+
             JCommander obj = jcommander.getCommands().get(cmd);
             CmdBase cmdObj = (CmdBase) obj.getObjects().get(0);
 
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 159edaf..2a48586 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -38,7 +38,7 @@ import java.util.*;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Sink;
+import org.apache.pulsar.client.admin.Sinks;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
@@ -94,7 +94,7 @@ public class TestCmdSinks {
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon 
Jul 02 00:33:15 0000 2018\"}";
 
     private PulsarAdmin pulsarAdmin;
-    private Sink sink;
+    private Sinks sink;
     private CmdSinks cmdSinks;
     private CmdSinks.CreateSink createSink;
     private CmdSinks.UpdateSink updateSink;
@@ -105,8 +105,8 @@ public class TestCmdSinks {
     public void setup() throws Exception {
 
         pulsarAdmin = mock(PulsarAdmin.class);
-        sink = mock(Sink.class);
-        when(pulsarAdmin.sink()).thenReturn(sink);
+        sink = mock(Sinks.class);
+        when(pulsarAdmin.sinks()).thenReturn(sink);
 
         cmdSinks = spy(new CmdSinks(pulsarAdmin));
         createSink = spy(cmdSinks.getCreateSink());
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 19c6c59..da6e99f 100644
--- 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ 
b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -36,7 +36,7 @@ import java.nio.file.Files;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Source;
+import org.apache.pulsar.client.admin.Sources;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
@@ -78,7 +78,7 @@ public class TestCmdSources {
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon 
Jul 02 00:33:15 +0000 2018\"}";
 
     private PulsarAdmin pulsarAdmin;
-    private Source source;
+    private Sources source;
     private CmdSources CmdSources;
     private CmdSources.CreateSource createSource;
     private CmdSources.UpdateSource updateSource;
@@ -89,8 +89,8 @@ public class TestCmdSources {
     public void setup() throws Exception {
 
         pulsarAdmin = mock(PulsarAdmin.class);
-        source = mock(Source.class);
-        when(pulsarAdmin.source()).thenReturn(source);
+        source = mock(Sources.class);
+        when(pulsarAdmin.sources()).thenReturn(source);
 
         CmdSources = spy(new CmdSources(pulsarAdmin));
         createSource = spy(CmdSources.getCreateSource());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 4a53d3e..b2f3339 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -23,12 +23,14 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
-import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
 import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v3.FunctionApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.FunctionsApiV3Resource;
 import org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SinksApiV3Resource;
 import org.apache.pulsar.functions.worker.rest.api.v3.SourceApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SourcesApiV3Resource;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 
 public final class Resources {
@@ -39,7 +41,7 @@ public final class Resources {
     public static Set<Class<?>> getApiV2Resources() {
         return new HashSet<>(
                 Arrays.asList(
-                        FunctionApiV2Resource.class,
+                        FunctionsApiV2Resource.class,
                         WorkerApiV2Resource.class,
                         WorkerStatsApiV2Resource.class,
                         MultiPartFeature.class
@@ -50,9 +52,11 @@ public final class Resources {
         return new HashSet<>(
                 Arrays.asList(
                         MultiPartFeature.class,
+                        SourcesApiV3Resource.class,
                         SourceApiV3Resource.class,
+                        SinksApiV3Resource.class,
                         SinkApiV3Resource.class,
-                        FunctionApiV3Resource.class
+                        FunctionsApiV3Resource.class
                 ));
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
similarity index 99%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index e7b9ff1..fa07160 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
-public class SinkImpl extends ComponentImpl {
+public class SinksImpl extends ComponentImpl {
 
     private class GetSinkStatus extends GetStatus<SinkStatus, 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
 
@@ -206,7 +206,7 @@ public class SinkImpl extends ComponentImpl {
         return exceptionInformation;
     }
 
-    public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
+    public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.SINK);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
similarity index 99%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index a2b59ef..d35724e 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
 import static 
org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 
 @Slf4j
-public class SourceImpl extends ComponentImpl {
+public class SourcesImpl extends ComponentImpl {
     private class GetSourceStatus extends GetStatus<SourceStatus, 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
 
         @Override
@@ -208,7 +208,7 @@ public class SourceImpl extends ComponentImpl {
         }
     }
 
-    public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
+    public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, 
Function.FunctionDetails.ComponentType.SOURCE);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java
similarity index 99%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java
index c978cad..6486187 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionsApiV2Resource.java
@@ -47,11 +47,11 @@ import java.util.List;
 
 @Slf4j
 @Path("/functions")
-public class FunctionApiV2Resource extends FunctionApiResource {
+public class FunctionsApiV2Resource extends FunctionApiResource {
 
     protected final FunctionsImplV2 functions;
 
-    public FunctionApiV2Resource() {
+    public FunctionsApiV2Resource() {
         this.functions = new FunctionsImplV2(this);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
similarity index 99%
rename from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
rename to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 7c8136d..38f4c4b 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -50,11 +50,11 @@ import java.util.List;
 
 @Slf4j
 @Path("/functions")
-public class FunctionApiV3Resource extends FunctionApiResource {
+public class FunctionsApiV3Resource extends FunctionApiResource {
 
     protected final FunctionsImpl functions;
 
-    public FunctionApiV3Resource() {
+    public FunctionsApiV3Resource() {
         this.functions = new FunctionsImpl(this);
     }
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index b12c381..3731b92 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -18,220 +18,20 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.common.functions.UpdateOptions;
-import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.policies.data.SinkStatus;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
-import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-import org.glassfish.jersey.media.multipart.FormDataParam;
+import io.swagger.annotations.Api;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
 
-@Slf4j
+@Api(value = "/sink", description = "Sink admin apis", tags = "sink")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
 @Path("/sink")
-public class SinkApiV3Resource extends FunctionApiResource {
-
-    protected final SinkImpl sink;
-
-    public SinkApiV3Resource() {
-        this.sink = new SinkImpl(this);
-    }
-
-    @POST
-    @Path("/{tenant}/{namespace}/{sinkName}")
-    @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void registerSink(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace,
-                             final @PathParam("sinkName") String sinkName,
-                             final @FormDataParam("data") InputStream 
uploadedInputStream,
-                             final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                             final @FormDataParam("url") String functionPkgUrl,
-                             final @FormDataParam("sinkConfig") String 
sinkConfigJson) {
-
-        sink.registerFunction(tenant, namespace, sinkName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), 
clientAuthData());
-    }
-
-    @PUT
-    @Path("/{tenant}/{namespace}/{sinkName}")
-    @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void updateSink(final @PathParam("tenant") String tenant,
-                           final @PathParam("namespace") String namespace,
-                           final @PathParam("sinkName") String sinkName,
-                           final @FormDataParam("data") InputStream 
uploadedInputStream,
-                           final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                           final @FormDataParam("url") String functionPkgUrl,
-                           final @FormDataParam("sinkConfig") String 
sinkConfigJson,
-                           final @FormDataParam("updateOptions") UpdateOptions 
updateOptions) {
-
-        sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, 
fileDetail,
-                functionPkgUrl, sinkConfigJson, clientAppId(), 
clientAuthData(), updateOptions);
-    }
-
-    @DELETE
-    @Path("/{tenant}/{namespace}/{sinkName}")
-    public void deregisterSink(final @PathParam("tenant") String tenant,
-                               final @PathParam("namespace") String namespace,
-                               final @PathParam("sinkName") String sinkName) {
-        sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), 
clientAuthData());
-    }
-
-    @GET
-    @Path("/{tenant}/{namespace}/{sinkName}")
-    public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String 
namespace,
-                                  final @PathParam("sinkName") String sinkName)
-            throws IOException {
-        return sink.getSinkInfo(tenant, namespace, sinkName);
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Displays the status of a Pulsar Sink instance",
-            response = 
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
-            @ApiResponse(code = 404, message = "The sink doesn't exist")
-    })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
-    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData 
getSinkInstanceStatus(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("sinkName") String sinkName,
-            final @PathParam("instanceId") String instanceId) throws 
IOException {
-        return sink.getSinkInstanceStatus(tenant, namespace, sinkName, 
instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Displays the status of a Pulsar Sink running in cluster 
mode",
-            response = SinkStatus.class
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
-            @ApiResponse(code = 404, message = "The sink doesn't exist")
-    })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{sinkName}/status")
-    public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String 
namespace,
-                                    final @PathParam("sinkName") String 
sinkName) throws IOException {
-        return sink.getSinkStatus(tenant, namespace, sinkName, 
uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @GET
-    @Path("/{tenant}/{namespace}")
-    public List<String> listSink(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String 
namespace) {
-        return sink.listFunctions(tenant, namespace, clientAppId(), 
clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Restart sink instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-    @ApiResponse(code = 404, message = "The function does not exist"),
-    @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSink(final @PathParam("tenant") String tenant,
-                            final @PathParam("namespace") String namespace,
-                            final @PathParam("sinkName") String sinkName,
-                            final @PathParam("instanceId") String instanceId) {
-        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, 
this.uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Restart all sink instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-    @ApiResponse(code = 404, message = "The function does not exist"), 
@ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sinkName}/restart")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSink(final @PathParam("tenant") String tenant,
-                            final @PathParam("namespace") String namespace,
-                            final @PathParam("sinkName") String sinkName) {
-        sink.restartFunctionInstances(tenant, namespace, sinkName, 
clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Stop sink instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSink(final @PathParam("tenant") String tenant,
-                         final @PathParam("namespace") String namespace,
-                         final @PathParam("sinkName") String sinkName,
-                         final @PathParam("instanceId") String instanceId) {
-        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, 
this.uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Stop all sink instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-    @ApiResponse(code = 404, message = "The function does not exist"),
-    @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sinkName}/stop")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSink(final @PathParam("tenant") String tenant,
-                         final @PathParam("namespace") String namespace,
-                         final @PathParam("sinkName") String sinkName) {
-        sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), 
clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Start sink instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void startSink(final @PathParam("tenant") String tenant,
-                          final @PathParam("namespace") String namespace,
-                          final @PathParam("sinkName") String sinkName,
-                          final @PathParam("instanceId") String instanceId) {
-        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, 
this.uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Start all sink instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sinkName}/start")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void startSink(final @PathParam("tenant") String tenant,
-                          final @PathParam("namespace") String namespace,
-                          final @PathParam("sinkName") String sinkName) {
-        sink.startFunctionInstances(tenant, namespace, sinkName, 
clientAppId(), clientAuthData());
-    }
-
-    @GET
-    @Path("/builtinsinks")
-    public List<ConnectorDefinition> getSinkList() {
-        List<ConnectorDefinition> connectorDefinitions = 
sink.getListOfConnectors();
-        List<ConnectorDefinition> retVal = new ArrayList<>();
-        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
-            if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
-                retVal.add(connectorDefinition);
-            }
-        }
-        return retVal;
-    }
+@Deprecated
+/**
+ * @deprecated in favor of {@link SinksApiV3Resource}
+ */
+public class SinkApiV3Resource extends SinksApiV3Resource {
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
similarity index 96%
copy from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
copy to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index b12c381..e699544 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
+import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
@@ -28,7 +29,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
+import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -40,13 +41,16 @@ import java.util.ArrayList;
 import java.util.List;
 
 @Slf4j
-@Path("/sink")
-public class SinkApiV3Resource extends FunctionApiResource {
+@Api(value = "/sinks", description = "Sinks admin apis", tags = "sinks")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Path("/sinks")
+public class SinksApiV3Resource extends FunctionApiResource {
 
-    protected final SinkImpl sink;
+    protected final SinksImpl sink;
 
-    public SinkApiV3Resource() {
-        this.sink = new SinkImpl(this);
+    public SinksApiV3Resource() {
+        this.sink = new SinksImpl(this);
     }
 
     @POST
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index 96d2082..65d68f9 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -18,224 +18,20 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.common.functions.UpdateOptions;
-import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.common.policies.data.SourceStatus;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
-import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-import org.glassfish.jersey.media.multipart.FormDataParam;
+import io.swagger.annotations.Api;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
 
-@Slf4j
 @Path("/source")
-public class SourceApiV3Resource extends FunctionApiResource {
-
-    protected final SourceImpl source;
-
-    public SourceApiV3Resource() {
-        this.source = new SourceImpl(this);
-    }
-
-    @POST
-    @Path("/{tenant}/{namespace}/{sourceName}")
-    @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void registerSource(final @PathParam("tenant") String tenant,
-                                   final @PathParam("namespace") String 
namespace,
-                                   final @PathParam("sourceName") String 
sourceName,
-                                   final @FormDataParam("data") InputStream 
uploadedInputStream,
-                                   final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                                   final @FormDataParam("url") String 
functionPkgUrl,
-                                   final @FormDataParam("sourceConfig") String 
sourceConfigJson) {
-
-        source.registerFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, sourceConfigJson, clientAppId(), 
clientAuthData());
-
-    }
-
-    @PUT
-    @Path("/{tenant}/{namespace}/{sourceName}")
-    @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void updateSource(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace,
-                             final @PathParam("sourceName") String sourceName,
-                             final @FormDataParam("data") InputStream 
uploadedInputStream,
-                             final @FormDataParam("data") 
FormDataContentDisposition fileDetail,
-                             final @FormDataParam("url") String functionPkgUrl,
-                             final @FormDataParam("sourceConfig") String 
sourceConfigJson,
-                             final @FormDataParam("updateOptions") 
UpdateOptions updateOptions) {
-
-        source.updateFunction(tenant, namespace, sourceName, 
uploadedInputStream, fileDetail,
-                functionPkgUrl, sourceConfigJson, clientAppId(), 
clientAuthData(), updateOptions);
-    }
-
-
-    @DELETE
-    @Path("/{tenant}/{namespace}/{sourceName}")
-    public void deregisterSource(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String 
namespace,
-                                 final @PathParam("sourceName") String 
sourceName) {
-        source.deregisterFunction(tenant, namespace, sourceName, 
clientAppId(), clientAuthData());
-    }
-
-    @GET
-    @Path("/{tenant}/{namespace}/{sourceName}")
-    public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant,
-                                      final @PathParam("namespace") String 
namespace,
-                                      final @PathParam("sourceName") String 
sourceName)
-            throws IOException {
-        return source.getSourceInfo(tenant, namespace, sourceName);
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Displays the status of a Pulsar Source instance",
-            response = 
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
-            @ApiResponse(code = 404, message = "The source doesn't exist")
-    })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
-    public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData 
getSourceInstanceStatus(
-            final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace,
-            final @PathParam("sourceName") String sourceName,
-            final @PathParam("instanceId") String instanceId) throws 
IOException {
-        return source.getSourceInstanceStatus(
-            tenant, namespace, sourceName, instanceId, uri.getRequestUri(), 
clientAppId(), clientAuthData());
-    }
-
-    @GET
-    @ApiOperation(
-            value = "Displays the status of a Pulsar Source running in cluster 
mode",
-            response = SourceStatus.class
-    )
-    @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
-            @ApiResponse(code = 404, message = "The source doesn't exist")
-    })
-    @Produces(MediaType.APPLICATION_JSON)
-    @Path("/{tenant}/{namespace}/{sourceName}/status")
-    public SourceStatus getSourceStatus(final @PathParam("tenant") String 
tenant,
-                                    final @PathParam("namespace") String 
namespace,
-                                    final @PathParam("sourceName") String 
sourceName) throws IOException {
-        return source.getSourceStatus(tenant, namespace, sourceName, 
uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @GET
-    @Path("/{tenant}/{namespace}")
-    public List<String> listSources(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String 
namespace) {
-        return source.listFunctions(tenant, namespace, clientAppId(), 
clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Restart source instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSource(final @PathParam("tenant") String tenant,
-                              final @PathParam("namespace") String namespace,
-                              final @PathParam("sourceName") String sourceName,
-                              final @PathParam("instanceId") String 
instanceId) {
-        source.restartFunctionInstance(tenant, namespace, sourceName, 
instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Restart all source instances", response = 
Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-    @ApiResponse(code = 404, message = "The function does not exist"),
-    @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sourceName}/restart")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSource(final @PathParam("tenant") String tenant,
-                              final @PathParam("namespace") String namespace,
-                              final @PathParam("sourceName") String 
sourceName) {
-        source.restartFunctionInstances(tenant, namespace, sourceName, 
clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Stop source instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSource(final @PathParam("tenant") String tenant,
-                           final @PathParam("namespace") String namespace,
-                           final @PathParam("sourceName") String sourceName,
-                           final @PathParam("instanceId") String instanceId) {
-        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, 
this.uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Stop all source instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sourceName}/stop")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSource(final @PathParam("tenant") String tenant,
-                           final @PathParam("namespace") String namespace,
-                           final @PathParam("sourceName") String sourceName) {
-        source.stopFunctionInstances(tenant, namespace, sourceName, 
clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Start source instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void startSource(final @PathParam("tenant") String tenant,
-                            final @PathParam("namespace") String namespace,
-                            final @PathParam("sourceName") String sourceName,
-                            final @PathParam("instanceId") String instanceId) {
-        source.startFunctionInstance(tenant, namespace, sourceName, 
instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
-    }
-
-    @POST
-    @ApiOperation(value = "Start all source instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid 
request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
-    @Path("/{tenant}/{namespace}/{sourceName}/start")
-    @Consumes(MediaType.APPLICATION_JSON)
-    public void startSource(final @PathParam("tenant") String tenant,
-                            final @PathParam("namespace") String namespace,
-                            final @PathParam("sourceName") String sourceName) {
-        source.startFunctionInstances(tenant, namespace, sourceName, 
clientAppId(), clientAuthData());
-    }
-
-    @GET
-    @Path("/builtinsources")
-    public List<ConnectorDefinition> getSourceList() {
-        List<ConnectorDefinition> connectorDefinitions = 
source.getListOfConnectors();
-        List<ConnectorDefinition> retval = new ArrayList<>();
-        for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
-            if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
-                retval.add(connectorDefinition);
-            }
-        }
-        return retval;
-    }
+@Api(value = "/source", description = "Source admin apis", tags = "source")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Deprecated
+/**
+ * @deprecated in favor of {@link SourcesApiV3Resource}
+ */
+public class SourceApiV3Resource extends SourcesApiV3Resource {
 }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
similarity index 92%
copy from 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
copy to 
pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index 96d2082..de6df1a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
+import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
@@ -28,7 +29,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
+import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -40,13 +41,16 @@ import java.util.ArrayList;
 import java.util.List;
 
 @Slf4j
-@Path("/source")
-public class SourceApiV3Resource extends FunctionApiResource {
+@Api(value = "/sources", description = "Sources admin apis", tags = "sources")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Path("/sources")
+public class SourcesApiV3Resource extends FunctionApiResource {
 
-    protected final SourceImpl source;
+    protected final SourcesImpl source;
 
-    public SourceApiV3Resource() {
-        this.source = new SourceImpl(this);
+    public SourcesApiV3Resource() {
+        this.source = new SourcesImpl(this);
     }
 
     @POST
@@ -91,6 +95,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
     }
 
     @GET
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sourceName}")
     public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String 
namespace,
@@ -139,6 +144,7 @@ public class SourceApiV3Resource extends 
FunctionApiResource {
     }
 
     @GET
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}")
     public List<String> listSources(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String 
namespace) {
@@ -227,6 +233,16 @@ public class SourceApiV3Resource extends 
FunctionApiResource {
     }
 
     @GET
+    @ApiOperation(
+            value = "Fetches a list of supported Pulsar IO source connectors 
currently running in cluster mode",
+            response = List.class
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have 
admin permissions"),
+            @ApiResponse(code = 400, message = "Invalid request"),
+            @ApiResponse(code = 408, message = "Request timeout")
+    })
+    @Produces(MediaType.APPLICATION_JSON)
     @Path("/builtinsources")
     public List<ConnectorDefinition> getSourceList() {
         List<ConnectorDefinition> connectorDefinitions = 
source.getListOfConnectors();
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index b05b0cd..4405c21 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -92,7 +92,7 @@ import static 
org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link FunctionApiV2Resource}.
+ * Unit test of {@link FunctionsApiV2Resource}.
  */
 @PrepareForTest({WorkerUtils.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index c364906..434cfc2 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -52,7 +52,7 @@ import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -76,7 +76,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
@@ -90,7 +89,7 @@ import static 
org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link FunctionApiV2Resource}.
+ * Unit test of {@link FunctionsApiV2Resource}.
  */
 @PrepareForTest({WorkerUtils.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 2988e3a..3b3548a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -49,7 +49,7 @@ import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
+import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.api.mockito.PowerMockito;
@@ -88,7 +88,7 @@ import static 
org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link SinkApiV3Resource}.
+ * Unit test of {@link SinksApiV3Resource}.
  */
 @PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, 
ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*", "java.io.*" })
@@ -125,7 +125,7 @@ public class SinkApiV3ResourceTest {
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
     private Namespace mockedNamespace;
-    private SinkImpl resource;
+    private SinksImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetaData;
@@ -175,7 +175,7 @@ public class SinkApiV3ResourceTest {
             .setPulsarServiceUrl("pulsar://localhost:6650/");
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
-        this.resource = spy(new SinkImpl(() -> mockedWorkerService));
+        this.resource = spy(new SinksImpl(() -> mockedWorkerService));
         mockStatic(InstanceUtils.class);
         
PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SINK);
     }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 0381af7..cd55369 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -52,7 +52,7 @@ import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
+import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
 import org.apache.pulsar.io.twitter.TwitterFireHose;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.api.mockito.PowerMockito;
@@ -87,7 +87,7 @@ import static 
org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
- * Unit test of {@link SourceApiV3Resource}.
+ * Unit test of {@link SourcesApiV3Resource}.
  */
 @PrepareForTest({WorkerUtils.class, ConnectorUtils.class, 
FunctionCommon.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
@@ -121,7 +121,7 @@ public class SourceApiV3ResourceTest {
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
     private Namespace mockedNamespace;
-    private SourceImpl resource;
+    private SourcesImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetaData;
@@ -170,7 +170,7 @@ public class SourceApiV3ResourceTest {
             .setPulsarServiceUrl("pulsar://localhost:6650/");
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
-        this.resource = spy(new SourceImpl(() -> mockedWorkerService));
+        this.resource = spy(new SourcesImpl(() -> mockedWorkerService));
         mockStatic(InstanceUtils.class);
         
PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SOURCE);
     }

Reply via email to