This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 67a4be681aee9a74da38c388da3721e1140cf9c8 Author: guodongyang <1174533...@qq.com> AuthorDate: Wed Mar 3 12:13:18 2021 +0800 [broker] change getWorkerService method to throw UnsupportedOperationException (#9738) Fix #9633 When we set `functionsWorkerEnabled=false` in `broker.conf`, broker don't support management function of `functionWorker`. But when we try to appy those method. Broker will throw NullPointerException instead of throwing a more user friendly error that explain the problem. (cherry picked from commit baceabd482481f553d412a6f6946bd7f11a3abe2) --- .../org/apache/pulsar/broker/PulsarService.java | 5 +- .../apache/pulsar/broker/PulsarServiceTest.java | 69 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5c79909..f7c789b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -868,8 +868,9 @@ public class PulsarService implements AutoCloseable { return this.nsService; } - public WorkerService getWorkerService() { - return functionWorkerService.orElse(null); + public WorkerService getWorkerService() throws UnsupportedOperationException { + return functionWorkerService.orElseThrow(() -> new UnsupportedOperationException("Pulsar Function Worker " + + "is not enabled, probably functionsWorkerEnabled is set to false")); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java new file mode 100644 index 0000000..0ff1ca1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.AssertJUnit.assertSame; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.worker.WorkerConfig; +import org.apache.pulsar.functions.worker.WorkerService; +import org.testng.annotations.Test; + + +@Slf4j +public class PulsarServiceTest { + + @Test + public void testGetWorkerService() { + ServiceConfiguration configuration = new ServiceConfiguration(); + configuration.setZookeeperServers("localhost"); + configuration.setClusterName("clusterName"); + configuration.setFunctionsWorkerEnabled(true); + WorkerService expectedWorkerService = mock(WorkerService.class); + PulsarService pulsarService = spy(new PulsarService(configuration, new WorkerConfig(), + Optional.of(expectedWorkerService), (exitCode) -> {})); + + WorkerService actualWorkerService = pulsarService.getWorkerService(); + assertSame(expectedWorkerService, actualWorkerService); + } + + /** + * Verifies that the getWorkerService throws {@link UnsupportedOperationException} + * when functionsWorkerEnabled is set to false . + */ + @Test + public void testGetWorkerServiceException() throws Exception { + ServiceConfiguration configuration = new ServiceConfiguration(); + configuration.setZookeeperServers("localhost"); + configuration.setClusterName("clusterName"); + configuration.setFunctionsWorkerEnabled(false); + PulsarService pulsarService = new PulsarService(configuration, new WorkerConfig(), + Optional.empty(), (exitCode) -> {}); + + String errorMessage = "Pulsar Function Worker is not enabled, probably functionsWorkerEnabled is set to false"; + try { + pulsarService.getWorkerService(); + } catch (UnsupportedOperationException e) { + assertEquals(e.getMessage(), errorMessage); + } + } +}