This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4ed2e33 Fix Topic Pattern functionality for Python functions (#2760) 4ed2e33 is described below commit 4ed2e336a27a5e987878955d033f1f396fa3b184 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Wed Oct 10 16:45:09 2018 -0700 Fix Topic Pattern functionality for Python functions (#2760) * pulsar client no longer has subscribe_pattern interface * Added integration tests for topic pattern * Fixed integration test --- .../instance/src/main/python/python_instance.py | 5 +- .../integration/functions/PulsarFunctionsTest.java | 78 +++++++++++++++++++--- .../functions/utils/CommandGenerator.java | 22 ++---- 3 files changed, 76 insertions(+), 29 deletions(-) diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 54a7329..5a1bff5 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -36,6 +36,7 @@ from collections import namedtuple from threading import Timer import traceback import sys +import re import pulsar import contextimpl @@ -202,8 +203,8 @@ class PythonInstance(object): self.input_serdes[topic] = serde_kclass() Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: - self.consumers[topic] = self.pulsar_client.subscribe_pattern( - str(topic), subscription_name, + self.consumers[topic] = self.pulsar_client.subscribe( + re.compile(str(topic)), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index b8a1dae..0ea5404 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -24,8 +24,11 @@ import static org.testng.Assert.fail; import com.google.common.base.Stopwatch; import com.google.gson.Gson; + +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -581,22 +584,48 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { @Test(enabled = false) public void testPythonExclamationFunction() throws Exception { - testExclamationFunction(Runtime.PYTHON); + testExclamationFunction(Runtime.PYTHON, false); + } + + @Test(enabled = false) + public void testPythonExclamationTopicPatternFunction() throws Exception { + testExclamationFunction(Runtime.PYTHON, true); } @Test public void testJavaExclamationFunction() throws Exception { - testExclamationFunction(Runtime.JAVA); + testExclamationFunction(Runtime.JAVA, false); + } + + @Test + public void testJavaExclamationTopicPatternFunction() throws Exception { + testExclamationFunction(Runtime.JAVA, true); } - private void testExclamationFunction(Runtime runtime) throws Exception { + private void testExclamationFunction(Runtime runtime, boolean isTopicPattern) throws Exception { if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) { // python can only run on process mode return; } - String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8); + String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8); + if (isTopicPattern) { + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + @Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING) + .topic(inputTopicName + "1") + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + @Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING) + .topic(inputTopicName + "2") + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + inputTopicName = inputTopicName + ".*"; + } String functionName = "test-exclamation-fn-" + randomName(8); final int numMessages = 10; @@ -640,7 +669,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String functionClass, Schema<T> inputTopicSchema) throws Exception { CommandGenerator generator; - generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass); + if (inputTopicName.endsWith(".*")) { + generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass); + } else { + generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass); + } generator.setSinkTopic(outputTopicName); generator.setFunctionName(functionName); String command; @@ -731,17 +764,40 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { .subscriptionType(SubscriptionType.Exclusive) .subscriptionName("test-sub") .subscribe(); - @Cleanup Producer<String> producer = client.newProducer(Schema.STRING) - .topic(inputTopic) - .create(); + if (inputTopic.endsWith(".*")) { + @Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING) + .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1") + .create(); + @Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING) + .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2") + .create(); + + for (int i = 0; i < numMessages / 2; i++) { + producer1.send("message-" + i); + } + + for (int i = numMessages / 2; i < numMessages; i++) { + producer2.send("message-" + i); + } + } else { + @Cleanup Producer<String> producer = client.newProducer(Schema.STRING) + .topic(inputTopic) + .create(); + for (int i = 0; i < numMessages; i++) { + producer.send("message-" + i); + } + } + + Set<String> expectedMessages = new HashSet<>(); for (int i = 0; i < numMessages; i++) { - producer.send("message-" + i); + expectedMessages.add("message-" + i + "!"); } for (int i = 0; i < numMessages; i++) { - Message<String> msg = consumer.receive(); - assertEquals("message-" + i + "!", msg.getValue()); + Message<String> msg = consumer.receive(10, TimeUnit.SECONDS); + assertTrue(expectedMessages.contains(msg.getValue())); + expectedMessages.remove(msg.getValue()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java index 6f4d012..64b70a6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java @@ -39,6 +39,7 @@ public class CommandGenerator { private String namespace = "default"; private String functionClassName; private String sourceTopic; + private String sourceTopicPattern; private Map<String, String> customSereSourceTopics; private String sinkTopic; private String logTopic; @@ -64,28 +65,14 @@ public class CommandGenerator { return generator; } - public static CommandGenerator createDefaultGenerator(Map<String, String> customSereSourceTopics, - String functionClassName) { + public static CommandGenerator createTopicPatternGenerator(String sourceTopicPattern, String functionClassName) { CommandGenerator generator = new CommandGenerator(); - generator.setCustomSereSourceTopics(customSereSourceTopics); + generator.setSourceTopicPattern(sourceTopicPattern); generator.setFunctionClassName(functionClassName); generator.setRuntime(Runtime.JAVA); return generator; } - public static CommandGenerator createDefaultGenerator(String tenant, String namespace, String functionName) { - CommandGenerator generator = new CommandGenerator(); - generator.setTenant(tenant); - generator.setNamespace(namespace); - generator.setFunctionName(functionName); - generator.setRuntime(Runtime.JAVA); - return generator; - } - - public void createAdminUrl(String workerHost, int port) { - adminUrl = "http://" + workerHost + ":" + port; - } - public String generateCreateFunctionCommand() { return generateCreateFunctionCommand(null); } @@ -110,6 +97,9 @@ public class CommandGenerator { if (sourceTopic != null) { commandBuilder.append(" --inputs " + sourceTopic); } + if (sourceTopicPattern != null) { + commandBuilder.append(" --topics-pattern " + sourceTopicPattern); + } if (logTopic != null) { commandBuilder.append(" --logTopic " + logTopic); }