This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 381ccc0 Renamed PulsarFunction to plain Function (#1377) 381ccc0 is described below commit 381ccc07e03d45a0ae3f5ccecef56c08cf4de089 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Mar 13 14:33:51 2018 -0700 Renamed PulsarFunction to plain Function (#1377) --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 6 ++--- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 21 ++++++++-------- .../api/{PulsarFunction.java => Function.java} | 2 +- .../functions/api/utils/DefaultSerDeTest.java | 6 ++--- .../pulsar/functions/instance/JavaInstance.java | 17 ++++++------- .../functions/instance/JavaInstanceRunnable.java | 16 ++++++------ .../instance/JavaInstanceRunnableProcessTest.java | 9 +++---- .../instance/JavaInstanceRunnableTest.java | 29 +++++++++++----------- .../functions/instance/JavaInstanceTest.java | 4 +-- .../functions/api/examples/CounterFunction.java | 4 +-- .../api/examples/ExclamationFunction.java | 4 +-- .../functions/api/examples/LoggingFunction.java | 4 +-- .../functions/api/examples/PublishFunction.java | 4 +-- .../functions/api/examples/UserConfigFunction.java | 4 +-- .../functions/api/examples/UserMetricFunction.java | 4 +-- .../functions/api/examples/VoidFunction.java | 4 +-- .../rest/api/v2/FunctionApiV2ResourceTest.java | 4 +-- site/docs/latest/functions/api.md | 2 +- 18 files changed, 70 insertions(+), 74 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index 2cd664a..ca20cfe 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -55,7 +55,7 @@ import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdminWithFunctions; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; @@ -85,7 +85,7 @@ public class CmdFunctionsTest { private Functions functions; private CmdFunctions cmd; - public class DummyFunction implements PulsarFunction<String, String> { + public class DummyFunction implements Function<String, String> { @Override public String process(String input, Context context) throws Exception { return null; @@ -111,7 +111,7 @@ public class CmdFunctionsTest { mockStatic(Reflections.class); when(Reflections.classExistsInJar(any(File.class), anyString())).thenReturn(true); when(Reflections.classExists(anyString())).thenReturn(true); - when(Reflections.classInJarImplementsIface(any(File.class), anyString(), eq(PulsarFunction.class))) + when(Reflections.classInJarImplementsIface(any(File.class), anyString(), eq(Function.class))) .thenReturn(true); when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true); when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index c8010e7..ae348ae 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -46,7 +46,7 @@ import org.apache.bookkeeper.clients.utils.NetUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminWithFunctions; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -62,7 +62,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.function.Function; import org.apache.pulsar.functions.utils.Utils; @Slf4j @@ -250,13 +249,13 @@ public class CmdFunctions extends CmdBase { private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) { File file = new File(jarFile); - // check if the function class exists in Jar and it implements PulsarFunction class + // check if the function class exists in Jar and it implements Function class if (!Reflections.classExistsInJar(file, functionConfigBuilder.getClassName())) { throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s", functionConfigBuilder.getClassName(), jarFile)); - } else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), PulsarFunction.class) - && !Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), Function.class)) { - throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither PulsarFunction nor java.util.Function", + } else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), Function.class) + && !Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), java.util.function.Function.class)) { + throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither Function nor java.util.function.Function", functionConfigBuilder.getClassName(), jarFile)); } @@ -269,20 +268,20 @@ public class CmdFunctions extends CmdBase { Object userClass = Reflections.createInstance(functionConfigBuilder.getClassName(), file); Class<?>[] typeArgs; - if (userClass instanceof PulsarFunction) { - PulsarFunction pulsarFunction = (PulsarFunction) userClass; + if (userClass instanceof Function) { + Function pulsarFunction = (Function) userClass; if (pulsarFunction == null) { throw new IllegalArgumentException(String.format("Pulsar function class %s could not be instantiated from jar %s", functionConfigBuilder.getClassName(), jarFile)); } - typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); } else { - Function function = (Function) userClass; + java.util.function.Function function = (java.util.function.Function) userClass; if (function == null) { throw new IllegalArgumentException(String.format("Java Util function class %s could not be instantiated from jar %s", functionConfigBuilder.getClassName(), jarFile)); } - typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); + typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); } // Check if the Input serialization/deserialization class exists in jar or already loaded and that it diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/PulsarFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java similarity index 97% rename from pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/PulsarFunction.java rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java index 82406a8..ca292eb 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/PulsarFunction.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java @@ -26,7 +26,7 @@ package org.apache.pulsar.functions.api; * meet your needs, you can use the byte stream handler defined in RawRequestHandler. */ @FunctionalInterface -public interface PulsarFunction<I, O> { +public interface Function<I, O> { /** * Process the input. * @return the output diff --git a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java index d878650..3b1b118 100644 --- a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java +++ b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java @@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.utils; import net.jodah.typetools.TypeResolver; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; import org.testng.annotations.Test; @@ -82,7 +82,7 @@ public class DefaultSerDeTest { assertEquals(result, input); } - private class SimplePulsarFunction implements PulsarFunction<String, String> { + private class SimplePulsarFunction implements Function<String, String> { @Override public String process(String input, Context context) { return null; @@ -92,7 +92,7 @@ public class DefaultSerDeTest { @Test public void testPulsarFunction() { SimplePulsarFunction pulsarFunction = new SimplePulsarFunction(); - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); SerDe serDe = new DefaultSerDe(String.class); Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); assertTrue(inputSerdeTypeArgs[0].isAssignableFrom(typeArgs[0])); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index aa80e71..14f3171 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -24,11 +24,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import java.util.Map; -import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +42,8 @@ public class JavaInstance implements AutoCloseable { @Getter(AccessLevel.PACKAGE) private final ContextImpl context; - private PulsarFunction pulsarFunction; - private Function javaUtilFunction; + private Function function; + private java.util.function.Function javaUtilFunction; public JavaInstance(InstanceConfig config, Object userClassObject, ClassLoader clsLoader, @@ -56,10 +55,10 @@ public class JavaInstance implements AutoCloseable { this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers); // create the functions - if (userClassObject instanceof PulsarFunction) { - this.pulsarFunction = (PulsarFunction) userClassObject; + if (userClassObject instanceof Function) { + this.function = (Function) userClassObject; } else { - this.javaUtilFunction = (Function) userClassObject; + this.javaUtilFunction = (java.util.function.Function) userClassObject; } } @@ -73,8 +72,8 @@ public class JavaInstance implements AutoCloseable { JavaExecutionResult executionResult = new JavaExecutionResult(); try { Object output; - if (pulsarFunction != null) { - output = pulsarFunction.process(input, context); + if (function != null) { + output = function.process(input, context); } else { output = javaUtilFunction.apply(input); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 06904cf..fffe8f5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -32,7 +32,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import java.util.function.Function; + import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; @@ -53,7 +53,7 @@ import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees; @@ -171,16 +171,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv Object object = Reflections.createInstance( instanceConfig.getFunctionConfig().getClassName(), clsLoader); - if (!(object instanceof PulsarFunction) && !(object instanceof Function)) { - throw new RuntimeException("User class must either be PulsarFunction or java.util.Function"); + if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) { + throw new RuntimeException("User class must either be Function or java.util.Function"); } Class<?>[] typeArgs; - if (object instanceof PulsarFunction) { - PulsarFunction pulsarFunction = (PulsarFunction) object; - typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); - } else { + if (object instanceof Function) { Function function = (Function) object; typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); + } else { + java.util.function.Function function = (java.util.function.Function) object; + typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); } // setup serde diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java index affe68d..e8e761f 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java @@ -74,7 +74,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees; @@ -87,7 +87,6 @@ import org.apache.pulsar.functions.utils.Utils; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.testng.PowerMockObjectFactory; import org.powermock.reflect.Whitebox; import org.testng.IObjectFactory; import org.testng.annotations.BeforeMethod; @@ -107,14 +106,14 @@ public class JavaInstanceRunnableProcessTest { return new org.powermock.modules.testng.PowerMockObjectFactory(); } - private static class TestFunction implements PulsarFunction<String, String> { + private static class TestFunction implements Function<String, String> { @Override public String process(String input, Context context) throws Exception { return input + "!"; } } - private static class TestFailureFunction implements PulsarFunction<String, String> { + private static class TestFailureFunction implements Function<String, String> { private int processId2Count = 0; @@ -134,7 +133,7 @@ public class JavaInstanceRunnableProcessTest { } } - private static class TestVoidFunction implements PulsarFunction<String, Void> { + private static class TestVoidFunction implements Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java index b4712ee..1e9781c 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java @@ -22,11 +22,10 @@ import lombok.Getter; import lombok.Setter; import net.jodah.typetools.TypeResolver; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionConfig; -import org.apache.pulsar.functions.instance.InstanceConfig; import org.testng.annotations.Test; import java.lang.reflect.InvocationTargetException; @@ -84,7 +83,7 @@ public class JavaInstanceRunnableTest { private Integer age; } - private class ComplexTypeHandler implements PulsarFunction<String, ComplexUserDefinedType> { + private class ComplexTypeHandler implements Function<String, ComplexUserDefinedType> { @Override public ComplexUserDefinedType process(String input, Context context) throws Exception { return new ComplexUserDefinedType(); @@ -103,14 +102,14 @@ public class JavaInstanceRunnableTest { } } - private class VoidInputHandler implements PulsarFunction<Void, String> { + private class VoidInputHandler implements Function<Void, String> { @Override public String process(Void input, Context context) throws Exception { return new String("Interesting"); } } - private class VoidOutputHandler implements PulsarFunction<String, Void> { + private class VoidOutputHandler implements Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { return null; @@ -127,7 +126,7 @@ public class JavaInstanceRunnableTest { Method method = makeAccessible(runnable); VoidInputHandler pulsarFunction = new VoidInputHandler(); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); method.invoke(runnable, typeArgs, clsLoader); assertFalse(true); } catch (InvocationTargetException ex) { @@ -147,7 +146,7 @@ public class JavaInstanceRunnableTest { Method method = makeAccessible(runnable); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); VoidOutputHandler pulsarFunction = new VoidOutputHandler(); - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass()); method.invoke(runnable, typeArgs, clsLoader); } catch (Exception ex) { assertTrue(false); @@ -163,8 +162,8 @@ public class JavaInstanceRunnableTest { JavaInstanceRunnable runnable = createRunnable(true, DefaultSerDe.class.getName()); Method method = makeAccessible(runnable); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + Function function = (Function<String, String>) (input, context) -> input + "-lambda"; + Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); method.invoke(runnable, typeArgs, clsLoader); fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (InvocationTargetException ex) { @@ -183,8 +182,8 @@ public class JavaInstanceRunnableTest { JavaInstanceRunnable runnable = createRunnable(false, null); Method method = makeAccessible(runnable); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + Function function = (Function<String, String>) (input, context) -> input + "-lambda"; + Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); method.invoke(runnable, typeArgs, clsLoader); } catch (Exception ex) { ex.printStackTrace(); @@ -202,8 +201,8 @@ public class JavaInstanceRunnableTest { JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName()); Method method = makeAccessible(runnable); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + Function function = (Function<String, String>) (input, context) -> input + "-lambda"; + Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); method.invoke(runnable, typeArgs, clsLoader); } catch (Exception ex) { assertTrue(false); @@ -219,8 +218,8 @@ public class JavaInstanceRunnableTest { JavaInstanceRunnable runnable = createRunnable(false, IntegerSerDe.class.getName()); Method method = makeAccessible(runnable); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); - PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda"; - Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass()); + Function function = (Function<String, String>) (input, context) -> input + "-lambda"; + Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass()); method.invoke(runnable, typeArgs, clsLoader); fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (InvocationTargetException ex) { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index 1583349..53c93a4 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -22,7 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.FunctionConfig; import org.testng.annotations.Test; @@ -49,7 +49,7 @@ public class JavaInstanceTest { InstanceConfig config = createInstanceConfig(); JavaInstance instance = new JavaInstance( config, - (PulsarFunction<String, String>) (input, context) -> input + "-lambda", + (Function<String, String>) (input, context) -> input + "-lambda", null, null, new HashMap<>()); String testString = "ABC123"; JavaExecutionResult result = instance.handleMessage(MessageId.earliest, "random", testString); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java index e41055f..7bf1657 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java @@ -19,9 +19,9 @@ package org.apache.pulsar.functions.api.examples; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; -public class CounterFunction implements PulsarFunction<String, Void> { +public class CounterFunction implements Function<String, Void> { @Override public Void process(String input, Context context) throws Exception { String[] parts = input.split("\\."); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java index 12e97db..ae8ef76 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java @@ -19,9 +19,9 @@ package org.apache.pulsar.functions.api.examples; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; -public class ExclamationFunction implements PulsarFunction<String, String> { +public class ExclamationFunction implements Function<String, String> { @Override public String process(String input, Context context) { diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java index f85dd90..5d2ff5a 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java @@ -20,12 +20,12 @@ package org.apache.pulsar.functions.api.examples; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; /** * A function with logging example. */ -public class LoggingFunction implements PulsarFunction<String, String> { +public class LoggingFunction implements Function<String, String> { private static final AtomicIntegerFieldUpdater<LoggingFunction> COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(LoggingFunction.class, "counter"); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java index 6ef92ae..236d651 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java @@ -19,10 +19,10 @@ package org.apache.pulsar.functions.api.examples; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; -public class PublishFunction implements PulsarFunction<String, Void> { +public class PublishFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { context.publish(context.getUserConfigValue("PublishTopic"), input + "!", DefaultSerDe.class.getName()); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java index 2d2ac2a..65df29a 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java @@ -19,9 +19,9 @@ package org.apache.pulsar.functions.api.examples; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; -public class UserConfigFunction implements PulsarFunction<String, String> { +public class UserConfigFunction implements Function<String, String> { @Override public String process(String input, Context context) { context.getLogger().info("My Config is " + context.getUserConfigValue("MyOwnConfig")); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java index ab30d11..2a9b95c 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java @@ -19,9 +19,9 @@ package org.apache.pulsar.functions.api.examples; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; -public class UserMetricFunction implements PulsarFunction<String, Void> { +public class UserMetricFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java index b0a05a3..abcc663 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java @@ -19,9 +19,9 @@ package org.apache.pulsar.functions.api.examples; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; -public class VoidFunction implements PulsarFunction<String, Void> { +public class VoidFunction implements Function<String, Void> { @Override public Void process(String input, Context context) { return null; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 721d4b4..7dabeed 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -44,7 +44,7 @@ import org.apache.logging.log4j.core.config.Configurator; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.PulsarFunction; +import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; import org.apache.pulsar.functions.proto.Function.FunctionConfig; @@ -76,7 +76,7 @@ public class FunctionApiV2ResourceTest { return new org.powermock.modules.testng.PowerMockObjectFactory(); } - private static final class TestFunction implements PulsarFunction<String, String> { + private static final class TestFunction implements Function<String, String> { public String process(String input, Context context) throws Exception { return input; diff --git a/site/docs/latest/functions/api.md b/site/docs/latest/functions/api.md index 895679e..6e7581d 100644 --- a/site/docs/latest/functions/api.md +++ b/site/docs/latest/functions/api.md @@ -39,7 +39,7 @@ Both the [Java](#java-functions-with-context) and [Python](#python-functions-wit Writing Pulsar Functions in Java involves implementing one of two interfaces: * The [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface -* The {% javadoc PulsarFunction client org.apache.pulsar.functions.api.PulsarFunction %} interface. This interface works much like the `java.util.Function` ihterface, but with the important difference +* The {% javadoc PulsarFunction client org.apache.pulsar.functions.api.Function %} interface. This interface works much like the `java.util.Function` ihterface, but with the important difference ### Java functions without context -- To stop receiving notification emails like this one, please contact si...@apache.org.