This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d45ee07 Improve batch source intermediate topic cleanup (#7985) d45ee07 is described below commit d45ee07fe2bac6fe03d62895b497bfee0807bb8d Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sat Sep 5 16:02:16 2020 -0700 Improve batch source intermediate topic cleanup (#7985) Co-authored-by: Jerry Peng <jer...@splunk.com> --- .../java/org/apache/pulsar/functions/worker/FunctionActioner.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index a79138d..c75fb3a 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -422,7 +422,7 @@ public class FunctionActioner { private Supplier<Actions.ActionResult> getDeleteTopicSupplier(String topic) { return () -> { try { - pulsarAdmin.topics().delete(topic); + pulsarAdmin.topics().delete(topic, true); } catch (PulsarAdminException e) { if (e instanceof PulsarAdminException.NotFoundException) { return Actions.ActionResult.builder() @@ -628,19 +628,21 @@ public class FunctionActioner { try { Actions.newBuilder() .addAction( + // Unsubscribe and allow time for consumers to close Actions.Action.builder() .actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s", intermediateTopicSubscription, fqfn)) .numRetries(10) .sleepBetweenInvocationsMs(1000) - .continueOn(true) .supplier( getDeleteSubscriptionSupplier(intermediateTopicName, false, intermediateTopicSubscription) ) .build()) - .addAction(Actions.Action.builder() + .addAction( + // Delete topic forcibly regardless whether unsubscribe succeeded or not + Actions.Action.builder() .actionName(String.format("Deleting intermediate topic %s for Batch Source %s", intermediateTopicName, fqfn)) .numRetries(10)