srkukarni closed pull request #2947: Check for presence of namespace/tenant 
before creating function
URL: https://github.com/apache/pulsar/pull/2947
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index c0d81e5c80..fb081e5bf7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -29,16 +29,14 @@
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.client.api.Authentication;
@@ -78,6 +76,7 @@
     String workerId;
     WorkerServer workerServer;
     PulsarAdmin functionAdmin;
+    private List<String> namespaceList = new LinkedList<>();
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
     private final int workerServicePort = PortManager.nextFreePort();
     private final int workerServicePortTls = PortManager.nextFreePort();
@@ -123,6 +122,9 @@ void setup(Method method) throws Exception {
         Set<String> admins = Sets.newHashSet("superUser");
         TenantInfo tenantInfo = new TenantInfo(admins, null);
         when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
+        Namespaces namespaces = mock(Namespaces.class);
+        when(admin.namespaces()).thenReturn(namespaces);
+        when(namespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         // mock: once authentication passes, function should return response: 
function already exist
         FunctionMetaDataManager dataManager = 
mock(FunctionMetaDataManager.class);
@@ -205,6 +207,7 @@ public void testAuthorization() throws Exception {
         final String sinkTopic = "persistent://" + replNamespace + "/output";
         final String functionName = "PulsarSink-test";
         final String subscriptionName = "test-sub";
+        namespaceList.add(replNamespace);
 
         String jarFilePathUrl = String.format("%s:%s", 
org.apache.pulsar.common.functions.Utils.FILE,
                 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d14d4f696d..af8c050b2a 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -153,6 +153,46 @@ public Response registerFunction(final String tenant, 
final String namespace, fi
             return getUnavailableResponse();
         }
 
+        if (tenant == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Tenant is not provided")).build();
+        }
+        if (namespace == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Namespace is not 
provided")).build();
+        }
+        if (componentName == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(componentType + " Name is not 
provided")).build();
+        }
+
+        try {
+            TenantInfo tenantInfo = 
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+            String qualifedNamespace = tenant + "/" + namespace;
+            if 
(!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace))
 {
+                log.error("{}/{}/{} Namespace {} does not exist", tenant, 
namespace,
+                        componentName, namespace);
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("Namespace does not 
exist")).build();
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+                    componentName, clientRole, componentType);
+            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
+
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace,
+                    componentName, tenant);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Tenant does not exist")).build();
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace,
+                    componentName, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index c3995a83d9..e03d90d3f1 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -47,7 +47,12 @@
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
@@ -109,6 +114,11 @@ public String process(String input, Context context) {
     private static final int parallelism = 1;
 
     private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
@@ -118,14 +128,19 @@ public String process(String input, Context context) {
     private FormDataContentDisposition mockedFormData;
 
     @BeforeMethod
-    public void setup() {
+    public void setup() throws Exception {
         this.mockedManager = mock(FunctionMetaDataManager.class);
         this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
+        this.mockedTenantInfo = mock(TenantInfo.class);
         this.mockedRuntimeFactory = mock(RuntimeFactory.class);
         this.mockedInputStream = mock(InputStream.class);
         this.mockedNamespace = mock(Namespace.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
         when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
@@ -133,6 +148,11 @@ public void setup() {
         
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
+        
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
@@ -485,6 +505,22 @@ public void testRegisterFunctionSuccess() throws Exception 
{
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
+    @Test
+    public void testRegisterFunctionNonexistantNamespace() throws Exception {
+        this.namespaceList.clear();
+        Response response = registerDefaultFunction();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Namespace does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterFunctionNonexistantTenant() throws Exception {
+        
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+        Response response = registerDefaultFunction();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
     @Test
     public void testRegisterFunctionFailure() throws Exception {
         mockStatic(Utils.class);
@@ -1207,6 +1243,7 @@ public void testRegisterFunctionWithConflictingFields() 
throws IOException {
         String actualTenant = "DIFFERENT_TENANT";
         String actualNamespace = "DIFFERENT_NAMESPACE";
         String actualName = "DIFFERENT_NAME";
+        this.namespaceList.add(actualTenant + "/" + actualNamespace);
 
         String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String filePackageUrl = "file://" + fileLocation;
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 93a0a747c7..5e710e03d5 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -24,7 +24,12 @@
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
@@ -102,6 +107,11 @@ public IObjectFactory getObjectFactory() {
     private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
@@ -119,6 +129,11 @@ public void setup() throws Exception {
         this.mockedNamespace = mock(Namespace.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
         when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedTenantInfo = mock(TenantInfo.class);
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
@@ -126,6 +141,11 @@ public void setup() throws Exception {
         
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
+        
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         URL file = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
@@ -431,6 +451,7 @@ public void testRegisterSinkConflictingFields() throws 
Exception {
         String actualTenant = "DIFFERENT_TENANT";
         String actualNamespace = "DIFFERENT_NAMESPACE";
         String actualName = "DIFFERENT_NAME";
+        this.namespaceList.add(actualTenant + "/" + actualNamespace);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
@@ -1064,4 +1085,20 @@ public void testOnlyGetSinks() throws Exception {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
+
+    @Test
+    public void testRegisterFunctionNonexistantNamespace() throws Exception {
+        this.namespaceList.clear();
+        Response response = registerDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Namespace does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterFunctionNonexistantTenant() throws Exception {
+        
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+        Response response = registerDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index b7e5d130a0..0c420a3e2f 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -24,7 +24,12 @@
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function.*;
@@ -90,6 +95,11 @@ public IObjectFactory getObjectFactory() {
     private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
@@ -107,6 +117,11 @@ public void setup() throws Exception {
         this.mockedNamespace = mock(Namespace.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
         when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedTenantInfo = mock(TenantInfo.class);
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
@@ -114,6 +129,11 @@ public void setup() throws Exception {
         
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
+        
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         URL file = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
@@ -402,6 +422,7 @@ public void testRegisterSourceConflictingFields() throws 
Exception {
         String actualTenant = "DIFFERENT_TENANT";
         String actualNamespace = "DIFFERENT_NAMESPACE";
         String actualName = "DIFFERENT_NAME";
+        this.namespaceList.add(actualTenant + "/" + actualNamespace);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
@@ -1086,4 +1107,20 @@ public void testOnlyGetSources() throws Exception {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
+
+    @Test
+    public void testRegisterFunctionNonexistantNamespace() throws Exception {
+        this.namespaceList.clear();
+        Response response = registerDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Namespace does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterFunctionNonexistantTenant() throws Exception {
+        
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+        Response response = registerDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to