This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fb182c0 Remove function files stored in BK when function is de-registered (#7052) fb182c0 is described below commit fb182c04c0137347ffefdd00d0e5d5bf27d979f6 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed May 27 13:58:14 2020 -0700 Remove function files stored in BK when function is de-registered (#7052) * remove function files stored in BK when function is de-registered Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../worker/PulsarFunctionPublishTest.java | 149 +++++++++++++++++++-- .../pulsar/functions/worker/WorkerUtils.java | 11 +- .../functions/worker/rest/api/ComponentImpl.java | 41 +++--- 3 files changed, 169 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 8a4928f..e8b8aa8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -27,21 +27,10 @@ import static org.testng.Assert.assertNotEquals; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - -import java.io.File; -import java.lang.reflect.Method; -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; - import lombok.extern.slf4j.Slf4j; - +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.pulsar.broker.NoOpShutdownService; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -77,6 +66,21 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.io.File; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + /** * Test Pulsar function state * @@ -364,4 +368,121 @@ public class PulsarFunctionPublishTest { Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles)); } + + @Test(timeOut = 20000) + public void testPulsarFunctionBKCleanup() throws Exception { + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/input"; + final String publishTopic = "persistent://" + replNamespace + "/publishtopic"; + final String propertyKey = "key"; + final String propertyValue = "value"; + final String functionName = "PulsarFunction-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create a producer that creates a topic at broker + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(publishTopic).subscriptionName("sub").subscribe(); + + FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName, + sourceTopic, publishTopic, subscriptionName); + + String jarFilePath = getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile(); + File jarFile = new File(jarFilePath); + Assert.assertTrue(jarFile.exists() && jarFile.isFile()); + admin.functions().createFunction(functionConfig, jarFilePath); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 50, 150); + // validate pulsar sink consumer has started on the topic + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1); + + int totalMsgs = 5; + for (int i = 0; i < totalMsgs; i++) { + String data = "foo"; + producer.newMessage().property(propertyKey, propertyValue).key(String.valueOf(i)).value(data).send(); + } + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats.unackedMessages == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 50, 150); + + retryStrategically((test) -> { + try { + FunctionStats functionStat = admin.functions().getFunctionStats(tenant, namespacePortion, functionName); + return functionStat.getProcessedSuccessfullyTotal() == 5; + } catch (PulsarAdminException e) { + return false; + } + }, 50, 150); + + for (int i = 0; i < 5; i++) { + Message<String> msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedPropertyValue = msg.getProperty(propertyKey); + assertEquals(propertyValue, receivedPropertyValue); + assertEquals(msg.getProperty("input_topic"), sourceTopic); + assertEquals(msg.getKey(), String.valueOf(i)); + } + + // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages + // due to publish failure + assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages, + totalMsgs); + + // delete functions + admin.functions().deleteFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + return admin.topics().getStats(sourceTopic).subscriptions.size() == 0; + } catch (PulsarAdminException e) { + return false; + } + }, 50, 150); + + // make sure subscriptions are cleanup + assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0); + + // make sure all temp files are deleted + File dir = new File(System.getProperty("java.io.tmpdir")); + File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function")); + + Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles)); + + DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig); + + // check if all function files are deleted from BK + String url = String.format("distributedlog://%s/pulsar/functions", "127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); + log.info("dlog url: {}", url); + URI dlogUri = URI.create(url); + + Namespace dlogNamespace = NamespaceBuilder.newBuilder() + .conf(dlogConf) + .clientId("function-worker-" + workerConfig.getWorkerId()) + .uri(dlogUri) + .build(); + + List<String> files = new LinkedList<>(); + dlogNamespace.getLogs(String.format("%s/%s/%s", tenant, namespacePortion, functionName)).forEachRemaining(new java.util.function.Consumer<String>() { + @Override + public void accept(String s) { + files.add(s); + } + }); + + assertEquals(files.size(), 0, "BK files left over: " + files); + + } } \ No newline at end of file diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index 99faadd..a4e46a8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.functions.worker; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.distributedlog.AppendOnlyStreamWriter; @@ -55,9 +58,6 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; -import static org.apache.commons.lang3.StringUtils.isNotBlank; - @Slf4j public final class WorkerUtils { @@ -118,6 +118,11 @@ public final class WorkerUtils { } } + public static void deleteFromBookkeeper(Namespace namespace, String packagePath) throws IOException { + log.info("Deleting {} from BK", packagePath); + namespace.deleteLog(packagePath); + } + public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) { int numReplicas = workerConfig.getNumFunctionPackageReplicas(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index b6daa5e..cad442f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -18,6 +18,17 @@ */ package org.apache.pulsar.functions.worker.rest.api; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace; +import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName; +import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; @@ -71,11 +82,6 @@ import org.apache.pulsar.functions.worker.WorkerUtils; import org.apache.pulsar.functions.worker.request.RequestResult; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.StreamingOutput; -import javax.ws.rs.core.UriBuilder; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -93,16 +99,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; -import static org.apache.commons.lang3.StringUtils.isEmpty; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace; -import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName; -import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; -import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import javax.ws.rs.core.UriBuilder; @Slf4j public abstract class ComponentImpl { @@ -408,6 +409,16 @@ public abstract class ComponentImpl { ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e); throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage()); } + + // clean up component files stored in BK + if (!functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) && !functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) { + try { + WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionMetaData.getPackageLocation().getPackagePath()); + } catch (IOException e) { + log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName, + functionMetaData.getPackageLocation().getPackagePath(), e); + } + } } public FunctionConfig getFunctionInfo(final String tenant,