This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ec170b0  Check for presence of namespace/tenant before creating 
function (#2947)
ec170b0 is described below

commit ec170b0270896b4dd834116b86e9786fa15eb81b
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Wed Nov 7 13:41:48 2018 -0800

    Check for presence of namespace/tenant before creating function (#2947)
    
    * Check for presence of namespace/tenant before creating function
    
    * Fixed namespace semantics
---
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    | 11 +++---
 .../functions/worker/rest/api/FunctionsImpl.java   | 40 ++++++++++++++++++++++
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 39 ++++++++++++++++++++-
 .../worker/rest/api/v2/SinkApiV2ResourceTest.java  | 37 ++++++++++++++++++++
 .../rest/api/v2/SourceApiV2ResourceTest.java       | 37 ++++++++++++++++++++
 5 files changed, 159 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index c0d81e5..fb081e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -29,16 +29,14 @@ import java.io.File;
 import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.client.api.Authentication;
@@ -78,6 +76,7 @@ public class PulsarFunctionTlsTest {
     String workerId;
     WorkerServer workerServer;
     PulsarAdmin functionAdmin;
+    private List<String> namespaceList = new LinkedList<>();
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
     private final int workerServicePort = PortManager.nextFreePort();
     private final int workerServicePortTls = PortManager.nextFreePort();
@@ -123,6 +122,9 @@ public class PulsarFunctionTlsTest {
         Set<String> admins = Sets.newHashSet("superUser");
         TenantInfo tenantInfo = new TenantInfo(admins, null);
         when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
+        Namespaces namespaces = mock(Namespaces.class);
+        when(admin.namespaces()).thenReturn(namespaces);
+        when(namespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         // mock: once authentication passes, function should return response: 
function already exist
         FunctionMetaDataManager dataManager = 
mock(FunctionMetaDataManager.class);
@@ -205,6 +207,7 @@ public class PulsarFunctionTlsTest {
         final String sinkTopic = "persistent://" + replNamespace + "/output";
         final String functionName = "PulsarSink-test";
         final String subscriptionName = "test-sub";
+        namespaceList.add(replNamespace);
 
         String jarFilePathUrl = String.format("%s:%s", 
org.apache.pulsar.common.functions.Utils.FILE,
                 
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d14d4f6..af8c050 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -153,6 +153,46 @@ public class FunctionsImpl {
             return getUnavailableResponse();
         }
 
+        if (tenant == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Tenant is not provided")).build();
+        }
+        if (namespace == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Namespace is not 
provided")).build();
+        }
+        if (componentName == null) {
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(componentType + " Name is not 
provided")).build();
+        }
+
+        try {
+            TenantInfo tenantInfo = 
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+            String qualifedNamespace = tenant + "/" + namespace;
+            if 
(!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace))
 {
+                log.error("{}/{}/{} Namespace {} does not exist", tenant, 
namespace,
+                        componentName, namespace);
+                return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                        .entity(new ErrorData("Namespace does not 
exist")).build();
+            }
+        } catch (PulsarAdminException.NotAuthorizedException e) {
+            log.error("{}/{}/{} Client [{}] is not admin and authorized to 
operate {} on tenant", tenant, namespace,
+                    componentName, clientRole, componentType);
+            return 
Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("client is not authorize to perform 
operation")).build();
+
+        } catch (PulsarAdminException.NotFoundException e) {
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace,
+                    componentName, tenant);
+            return 
Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Tenant does not exist")).build();
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace,
+                    componentName, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to 
register {}", tenant, namespace,
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index c3995a8..e03d90d 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -47,7 +47,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
@@ -109,6 +114,11 @@ public class FunctionApiV2ResourceTest {
     private static final int parallelism = 1;
 
     private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
@@ -118,14 +128,19 @@ public class FunctionApiV2ResourceTest {
     private FormDataContentDisposition mockedFormData;
 
     @BeforeMethod
-    public void setup() {
+    public void setup() throws Exception {
         this.mockedManager = mock(FunctionMetaDataManager.class);
         this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
+        this.mockedTenantInfo = mock(TenantInfo.class);
         this.mockedRuntimeFactory = mock(RuntimeFactory.class);
         this.mockedInputStream = mock(InputStream.class);
         this.mockedNamespace = mock(Namespace.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
         when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
@@ -133,6 +148,11 @@ public class FunctionApiV2ResourceTest {
         
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
+        
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         // worker config
         WorkerConfig workerConfig = new WorkerConfig()
@@ -486,6 +506,22 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test
+    public void testRegisterFunctionNonexistantNamespace() throws Exception {
+        this.namespaceList.clear();
+        Response response = registerDefaultFunction();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Namespace does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterFunctionNonexistantTenant() throws Exception {
+        
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+        Response response = registerDefaultFunction();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
     public void testRegisterFunctionFailure() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
@@ -1207,6 +1243,7 @@ public class FunctionApiV2ResourceTest {
         String actualTenant = "DIFFERENT_TENANT";
         String actualNamespace = "DIFFERENT_NAMESPACE";
         String actualName = "DIFFERENT_NAME";
+        this.namespaceList.add(actualTenant + "/" + actualNamespace);
 
         String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String filePackageUrl = "file://" + fileLocation;
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 93a0a74..5e710e0 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -24,7 +24,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
@@ -102,6 +107,11 @@ public class SinkApiV2ResourceTest {
     private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
@@ -119,6 +129,11 @@ public class SinkApiV2ResourceTest {
         this.mockedNamespace = mock(Namespace.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
         when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedTenantInfo = mock(TenantInfo.class);
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
@@ -126,6 +141,11 @@ public class SinkApiV2ResourceTest {
         
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
+        
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         URL file = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
@@ -431,6 +451,7 @@ public class SinkApiV2ResourceTest {
         String actualTenant = "DIFFERENT_TENANT";
         String actualNamespace = "DIFFERENT_NAMESPACE";
         String actualName = "DIFFERENT_NAME";
+        this.namespaceList.add(actualTenant + "/" + actualNamespace);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(sink))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
@@ -1064,4 +1085,20 @@ public class SinkApiV2ResourceTest {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
+
+    @Test
+    public void testRegisterFunctionNonexistantNamespace() throws Exception {
+        this.namespaceList.clear();
+        Response response = registerDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Namespace does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterFunctionNonexistantTenant() throws Exception {
+        
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+        Response response = registerDefaultSink();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
 }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index b7e5d13..0c420a3 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -24,7 +24,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function.*;
@@ -90,6 +95,11 @@ public class SourceApiV2ResourceTest {
     private String INVALID_JAR_FILE_PATH;
 
     private WorkerService mockedWorkerService;
+    private PulsarAdmin mockedPulsarAdmin;
+    private Tenants mockedTenants;
+    private Namespaces mockedNamespaces;
+    private TenantInfo mockedTenantInfo;
+    private List<String> namespaceList = new LinkedList<>();
     private FunctionMetaDataManager mockedManager;
     private FunctionRuntimeManager mockedFunctionRunTimeManager;
     private RuntimeFactory mockedRuntimeFactory;
@@ -107,6 +117,11 @@ public class SourceApiV2ResourceTest {
         this.mockedNamespace = mock(Namespace.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
         when(mockedFormData.getFileName()).thenReturn("test");
+        this.mockedTenantInfo = mock(TenantInfo.class);
+        this.mockedPulsarAdmin = mock(PulsarAdmin.class);
+        this.mockedTenants = mock(Tenants.class);
+        this.mockedNamespaces = mock(Namespaces.class);
+        namespaceList.add(tenant + "/" + namespace);
 
         this.mockedWorkerService = mock(WorkerService.class);
         
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
@@ -114,6 +129,11 @@ public class SourceApiV2ResourceTest {
         
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
         
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
         when(mockedWorkerService.isInitialized()).thenReturn(true);
+        
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
+        when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
+        when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
+        when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
+        when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
 
         URL file = 
Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME);
         if (file == null)  {
@@ -402,6 +422,7 @@ public class SourceApiV2ResourceTest {
         String actualTenant = "DIFFERENT_TENANT";
         String actualNamespace = "DIFFERENT_NAMESPACE";
         String actualName = "DIFFERENT_NAME";
+        this.namespaceList.add(actualTenant + "/" + actualNamespace);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(source))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), 
eq(actualNamespace), eq(actualName))).thenReturn(false);
@@ -1086,4 +1107,20 @@ public class SourceApiV2ResourceTest {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
+
+    @Test
+    public void testRegisterFunctionNonexistantNamespace() throws Exception {
+        this.namespaceList.clear();
+        Response response = registerDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Namespace does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
+
+    @Test
+    public void testRegisterFunctionNonexistantTenant() throws Exception {
+        
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+        Response response = registerDefaultSource();
+        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+        assertEquals(new ErrorData("Tenant does not exist").reason, 
((ErrorData) response.getEntity()).reason);
+    }
 }

Reply via email to