This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ec170b0 Check for presence of namespace/tenant before creating function (#2947) ec170b0 is described below commit ec170b0270896b4dd834116b86e9786fa15eb81b Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Nov 7 13:41:48 2018 -0800 Check for presence of namespace/tenant before creating function (#2947) * Check for presence of namespace/tenant before creating function * Fixed namespace semantics --- .../apache/pulsar/io/PulsarFunctionTlsTest.java | 11 +++--- .../functions/worker/rest/api/FunctionsImpl.java | 40 ++++++++++++++++++++++ .../rest/api/v2/FunctionApiV2ResourceTest.java | 39 ++++++++++++++++++++- .../worker/rest/api/v2/SinkApiV2ResourceTest.java | 37 ++++++++++++++++++++ .../rest/api/v2/SourceApiV2ResourceTest.java | 37 ++++++++++++++++++++ 5 files changed, 159 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index c0d81e5..fb081e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -29,16 +29,14 @@ import java.io.File; import java.lang.reflect.Method; import java.net.MalformedURLException; import java.net.URL; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.client.api.Authentication; @@ -78,6 +76,7 @@ public class PulsarFunctionTlsTest { String workerId; WorkerServer workerServer; PulsarAdmin functionAdmin; + private List<String> namespaceList = new LinkedList<>(); private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); private final int workerServicePort = PortManager.nextFreePort(); private final int workerServicePortTls = PortManager.nextFreePort(); @@ -123,6 +122,9 @@ public class PulsarFunctionTlsTest { Set<String> admins = Sets.newHashSet("superUser"); TenantInfo tenantInfo = new TenantInfo(admins, null); when(tenants.getTenantInfo(any())).thenReturn(tenantInfo); + Namespaces namespaces = mock(Namespaces.class); + when(admin.namespaces()).thenReturn(namespaces); + when(namespaces.getNamespaces(any())).thenReturn(namespaceList); // mock: once authentication passes, function should return response: function already exist FunctionMetaDataManager dataManager = mock(FunctionMetaDataManager.class); @@ -205,6 +207,7 @@ public class PulsarFunctionTlsTest { final String sinkTopic = "persistent://" + replNamespace + "/output"; final String functionName = "PulsarSink-test"; final String subscriptionName = "test-sub"; + namespaceList.add(replNamespace); String jarFilePathUrl = String.format("%s:%s", org.apache.pulsar.common.functions.Utils.FILE, PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index d14d4f6..af8c050 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -153,6 +153,46 @@ public class FunctionsImpl { return getUnavailableResponse(); } + if (tenant == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Tenant is not provided")).build(); + } + if (namespace == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Namespace is not provided")).build(); + } + if (componentName == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(componentType + " Name is not provided")).build(); + } + + try { + TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant); + String qualifedNamespace = tenant + "/" + namespace; + if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifedNamespace)) { + log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, + componentName, namespace); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Namespace does not exist")).build(); + } + } catch (PulsarAdminException.NotAuthorizedException e) { + log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace, + componentName, clientRole, componentType); + return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("client is not authorize to perform operation")).build(); + + } catch (PulsarAdminException.NotFoundException e) { + log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, + componentName, tenant); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData("Tenant does not exist")).build(); + } catch (PulsarAdminException e) { + log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, + componentName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + try { if (!isAuthorizedRole(tenant, clientRole)) { log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace, diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index c3995a8..e03d90d 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -47,7 +47,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.pulsar.client.admin.Namespaces; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; @@ -109,6 +114,11 @@ public class FunctionApiV2ResourceTest { private static final int parallelism = 1; private WorkerService mockedWorkerService; + private PulsarAdmin mockedPulsarAdmin; + private Tenants mockedTenants; + private Namespaces mockedNamespaces; + private TenantInfo mockedTenantInfo; + private List<String> namespaceList = new LinkedList<>(); private FunctionMetaDataManager mockedManager; private FunctionRuntimeManager mockedFunctionRunTimeManager; private RuntimeFactory mockedRuntimeFactory; @@ -118,14 +128,19 @@ public class FunctionApiV2ResourceTest { private FormDataContentDisposition mockedFormData; @BeforeMethod - public void setup() { + public void setup() throws Exception { this.mockedManager = mock(FunctionMetaDataManager.class); this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class); + this.mockedTenantInfo = mock(TenantInfo.class); this.mockedRuntimeFactory = mock(RuntimeFactory.class); this.mockedInputStream = mock(InputStream.class); this.mockedNamespace = mock(Namespace.class); this.mockedFormData = mock(FormDataContentDisposition.class); when(mockedFormData.getFileName()).thenReturn("test"); + this.mockedPulsarAdmin = mock(PulsarAdmin.class); + this.mockedTenants = mock(Tenants.class); + this.mockedNamespaces = mock(Namespaces.class); + namespaceList.add(tenant + "/" + namespace); this.mockedWorkerService = mock(WorkerService.class); when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager); @@ -133,6 +148,11 @@ public class FunctionApiV2ResourceTest { when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory); when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); + when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); + when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); + when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); + when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList); // worker config WorkerConfig workerConfig = new WorkerConfig() @@ -486,6 +506,22 @@ public class FunctionApiV2ResourceTest { } @Test + public void testRegisterFunctionNonexistantNamespace() throws Exception { + this.namespaceList.clear(); + Response response = registerDefaultFunction(); + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertEquals(new ErrorData("Namespace does not exist").reason, ((ErrorData) response.getEntity()).reason); + } + + @Test + public void testRegisterFunctionNonexistantTenant() throws Exception { + when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class); + Response response = registerDefaultFunction(); + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason); + } + + @Test public void testRegisterFunctionFailure() throws Exception { mockStatic(Utils.class); doNothing().when(Utils.class); @@ -1207,6 +1243,7 @@ public class FunctionApiV2ResourceTest { String actualTenant = "DIFFERENT_TENANT"; String actualNamespace = "DIFFERENT_NAMESPACE"; String actualName = "DIFFERENT_NAME"; + this.namespaceList.add(actualTenant + "/" + actualNamespace); String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); String filePackageUrl = "file://" + fileLocation; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java index 93a0a74..5e710e0 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java @@ -24,7 +24,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.pulsar.client.admin.Namespaces; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function; @@ -102,6 +107,11 @@ public class SinkApiV2ResourceTest { private String INVALID_JAR_FILE_PATH; private WorkerService mockedWorkerService; + private PulsarAdmin mockedPulsarAdmin; + private Tenants mockedTenants; + private Namespaces mockedNamespaces; + private TenantInfo mockedTenantInfo; + private List<String> namespaceList = new LinkedList<>(); private FunctionMetaDataManager mockedManager; private FunctionRuntimeManager mockedFunctionRunTimeManager; private RuntimeFactory mockedRuntimeFactory; @@ -119,6 +129,11 @@ public class SinkApiV2ResourceTest { this.mockedNamespace = mock(Namespace.class); this.mockedFormData = mock(FormDataContentDisposition.class); when(mockedFormData.getFileName()).thenReturn("test"); + this.mockedTenantInfo = mock(TenantInfo.class); + this.mockedPulsarAdmin = mock(PulsarAdmin.class); + this.mockedTenants = mock(Tenants.class); + this.mockedNamespaces = mock(Namespaces.class); + namespaceList.add(tenant + "/" + namespace); this.mockedWorkerService = mock(WorkerService.class); when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager); @@ -126,6 +141,11 @@ public class SinkApiV2ResourceTest { when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory); when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); + when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); + when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); + when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); + when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList); URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME); if (file == null) { @@ -431,6 +451,7 @@ public class SinkApiV2ResourceTest { String actualTenant = "DIFFERENT_TENANT"; String actualNamespace = "DIFFERENT_NAMESPACE"; String actualName = "DIFFERENT_NAME"; + this.namespaceList.add(actualTenant + "/" + actualNamespace); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true); when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); @@ -1064,4 +1085,20 @@ public class SinkApiV2ResourceTest { assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(new Gson().toJson(functions), response.getEntity()); } + + @Test + public void testRegisterFunctionNonexistantNamespace() throws Exception { + this.namespaceList.clear(); + Response response = registerDefaultSink(); + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertEquals(new ErrorData("Namespace does not exist").reason, ((ErrorData) response.getEntity()).reason); + } + + @Test + public void testRegisterFunctionNonexistantTenant() throws Exception { + when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class); + Response response = registerDefaultSink(); + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java index b7e5d13..0c420a3 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java @@ -24,7 +24,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.pulsar.client.admin.Namespaces; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.proto.Function.*; @@ -90,6 +95,11 @@ public class SourceApiV2ResourceTest { private String INVALID_JAR_FILE_PATH; private WorkerService mockedWorkerService; + private PulsarAdmin mockedPulsarAdmin; + private Tenants mockedTenants; + private Namespaces mockedNamespaces; + private TenantInfo mockedTenantInfo; + private List<String> namespaceList = new LinkedList<>(); private FunctionMetaDataManager mockedManager; private FunctionRuntimeManager mockedFunctionRunTimeManager; private RuntimeFactory mockedRuntimeFactory; @@ -107,6 +117,11 @@ public class SourceApiV2ResourceTest { this.mockedNamespace = mock(Namespace.class); this.mockedFormData = mock(FormDataContentDisposition.class); when(mockedFormData.getFileName()).thenReturn("test"); + this.mockedTenantInfo = mock(TenantInfo.class); + this.mockedPulsarAdmin = mock(PulsarAdmin.class); + this.mockedTenants = mock(Tenants.class); + this.mockedNamespaces = mock(Namespaces.class); + namespaceList.add(tenant + "/" + namespace); this.mockedWorkerService = mock(WorkerService.class); when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager); @@ -114,6 +129,11 @@ public class SourceApiV2ResourceTest { when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory); when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); + when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); + when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); + when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); + when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList); URL file = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME); if (file == null) { @@ -402,6 +422,7 @@ public class SourceApiV2ResourceTest { String actualTenant = "DIFFERENT_TENANT"; String actualNamespace = "DIFFERENT_NAMESPACE"; String actualName = "DIFFERENT_NAME"; + this.namespaceList.add(actualTenant + "/" + actualNamespace); when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true); when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); @@ -1086,4 +1107,20 @@ public class SourceApiV2ResourceTest { assertEquals(Status.OK.getStatusCode(), response.getStatus()); assertEquals(new Gson().toJson(functions), response.getEntity()); } + + @Test + public void testRegisterFunctionNonexistantNamespace() throws Exception { + this.namespaceList.clear(); + Response response = registerDefaultSource(); + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertEquals(new ErrorData("Namespace does not exist").reason, ((ErrorData) response.getEntity()).reason); + } + + @Test + public void testRegisterFunctionNonexistantTenant() throws Exception { + when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class); + Response response = registerDefaultSource(); + assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus()); + assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason); + } }