srkukarni closed pull request #2760: Fix Topic Pattern functionality for Python functions URL: https://github.com/apache/pulsar/pull/2760
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 54a7329265..5a1bff541b 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -36,6 +36,7 @@ from threading import Timer import traceback import sys +import re import pulsar import contextimpl @@ -202,8 +203,8 @@ def run(self): self.input_serdes[topic] = serde_kclass() Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name)) if consumer_conf.isRegexPattern: - self.consumers[topic] = self.pulsar_client.subscribe_pattern( - str(topic), subscription_name, + self.consumers[topic] = self.pulsar_client.subscribe( + re.compile(str(topic)), subscription_name, consumer_type=mode, message_listener=partial(self.message_listener, self.input_serdes[topic]), unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index b8a1daefd2..0ea5404913 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -24,8 +24,11 @@ import com.google.common.base.Stopwatch; import com.google.gson.Gson; + +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -581,22 +584,48 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou @Test(enabled = false) public void testPythonExclamationFunction() throws Exception { - testExclamationFunction(Runtime.PYTHON); + testExclamationFunction(Runtime.PYTHON, false); + } + + @Test(enabled = false) + public void testPythonExclamationTopicPatternFunction() throws Exception { + testExclamationFunction(Runtime.PYTHON, true); } @Test public void testJavaExclamationFunction() throws Exception { - testExclamationFunction(Runtime.JAVA); + testExclamationFunction(Runtime.JAVA, false); + } + + @Test + public void testJavaExclamationTopicPatternFunction() throws Exception { + testExclamationFunction(Runtime.JAVA, true); } - private void testExclamationFunction(Runtime runtime) throws Exception { + private void testExclamationFunction(Runtime runtime, boolean isTopicPattern) throws Exception { if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) { // python can only run on process mode return; } - String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8); + String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8); + if (isTopicPattern) { + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + @Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING) + .topic(inputTopicName + "1") + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + @Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING) + .topic(inputTopicName + "2") + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + inputTopicName = inputTopicName + ".*"; + } String functionName = "test-exclamation-fn-" + randomName(8); final int numMessages = 10; @@ -640,7 +669,11 @@ private static void submitExclamationFunction(Runtime runtime, String functionClass, Schema<T> inputTopicSchema) throws Exception { CommandGenerator generator; - generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass); + if (inputTopicName.endsWith(".*")) { + generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass); + } else { + generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass); + } generator.setSinkTopic(outputTopicName); generator.setFunctionName(functionName); String command; @@ -731,17 +764,40 @@ private static void publishAndConsumeMessages(String inputTopic, .subscriptionType(SubscriptionType.Exclusive) .subscriptionName("test-sub") .subscribe(); - @Cleanup Producer<String> producer = client.newProducer(Schema.STRING) - .topic(inputTopic) - .create(); + if (inputTopic.endsWith(".*")) { + @Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING) + .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1") + .create(); + @Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING) + .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2") + .create(); + + for (int i = 0; i < numMessages / 2; i++) { + producer1.send("message-" + i); + } + + for (int i = numMessages / 2; i < numMessages; i++) { + producer2.send("message-" + i); + } + } else { + @Cleanup Producer<String> producer = client.newProducer(Schema.STRING) + .topic(inputTopic) + .create(); + for (int i = 0; i < numMessages; i++) { + producer.send("message-" + i); + } + } + + Set<String> expectedMessages = new HashSet<>(); for (int i = 0; i < numMessages; i++) { - producer.send("message-" + i); + expectedMessages.add("message-" + i + "!"); } for (int i = 0; i < numMessages; i++) { - Message<String> msg = consumer.receive(); - assertEquals("message-" + i + "!", msg.getValue()); + Message<String> msg = consumer.receive(10, TimeUnit.SECONDS); + assertTrue(expectedMessages.contains(msg.getValue())); + expectedMessages.remove(msg.getValue()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java index 6f4d012e24..64b70a63f9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java @@ -39,6 +39,7 @@ private String namespace = "default"; private String functionClassName; private String sourceTopic; + private String sourceTopicPattern; private Map<String, String> customSereSourceTopics; private String sinkTopic; private String logTopic; @@ -64,28 +65,14 @@ public static CommandGenerator createDefaultGenerator(String sourceTopic, String return generator; } - public static CommandGenerator createDefaultGenerator(Map<String, String> customSereSourceTopics, - String functionClassName) { + public static CommandGenerator createTopicPatternGenerator(String sourceTopicPattern, String functionClassName) { CommandGenerator generator = new CommandGenerator(); - generator.setCustomSereSourceTopics(customSereSourceTopics); + generator.setSourceTopicPattern(sourceTopicPattern); generator.setFunctionClassName(functionClassName); generator.setRuntime(Runtime.JAVA); return generator; } - public static CommandGenerator createDefaultGenerator(String tenant, String namespace, String functionName) { - CommandGenerator generator = new CommandGenerator(); - generator.setTenant(tenant); - generator.setNamespace(namespace); - generator.setFunctionName(functionName); - generator.setRuntime(Runtime.JAVA); - return generator; - } - - public void createAdminUrl(String workerHost, int port) { - adminUrl = "http://" + workerHost + ":" + port; - } - public String generateCreateFunctionCommand() { return generateCreateFunctionCommand(null); } @@ -110,6 +97,9 @@ public String generateCreateFunctionCommand(String codeFile) { if (sourceTopic != null) { commandBuilder.append(" --inputs " + sourceTopic); } + if (sourceTopicPattern != null) { + commandBuilder.append(" --topics-pattern " + sourceTopicPattern); + } if (logTopic != null) { commandBuilder.append(" --logTopic " + logTopic); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services