This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new bf2a9c7 Add Windowfunction interface to functions api (#3324) bf2a9c7 is described below commit bf2a9c7b84f1c749bfabdc21a2926b017caa61ad Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Mon Jan 14 09:54:49 2019 -0800 Add Windowfunction interface to functions api (#3324) * Added WindowFunction interface and implementation * Fixed logic * Update comments * Took feedback into account --- .../pulsar/functions/api}/WindowContext.java | 2 +- .../pulsar/functions/api}/WindowFunction.java | 23 ++++---- .../functions/windowing/WindowContextImpl.java | 1 + .../windowing/WindowFunctionExecutor.java | 64 ++++++++++++---------- .../windowing/WindowFunctionExecutorTest.java | 27 +++++---- ...{WindowFunction.java => AddWindowFunction.java} | 2 +- ...dowFunction.java => ContextWindowFunction.java} | 14 +++-- .../resources/example-window-function-config.yaml | 2 +- .../org/apache/pulsar/functions/utils/Utils.java | 34 ++++++++---- 9 files changed, 99 insertions(+), 70 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java similarity index 99% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java index 2f1f2e7..0abc87a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.windowing; +package org.apache.pulsar.functions.api; import org.slf4j.Logger; diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java similarity index 65% copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java copy to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java index ae01cec..6f2c421 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java @@ -16,20 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.api.examples; - -import lombok.extern.slf4j.Slf4j; +package org.apache.pulsar.functions.api; import java.util.Collection; -import java.util.function.Function; /** - * Example Function that acts on a window of tuples at a time rather than per tuple basis. + * This is the interface of the windowed function api. The process method is called + * for every triggered window. */ -@Slf4j -public class WindowFunction implements Function <Collection<Integer>, Integer> { - @Override - public Integer apply(Collection<Integer> integers) { - return integers.stream().reduce(0, (x, y) -> x + y); - } -} +@FunctionalInterface +public interface WindowFunction<I, O> { + /** + * Process the input. + * @return the output + */ + O process(Collection<Record<I>> input, WindowContext context) throws Exception; +} \ No newline at end of file diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java index 41e8ebe..de00f52 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.windowing; import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.WindowContext; import org.slf4j.Logger; import java.nio.ByteBuffer; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java index e288261..1945949 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java @@ -30,9 +30,7 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.functions.api.Context; -import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.api.*; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.common.functions.WindowConfig; import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy; @@ -51,22 +49,23 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { private boolean initialized; protected WindowConfig windowConfig; - private WindowManager<I> windowManager; + private WindowManager<Record<I>> windowManager; private TimestampExtractor<I> timestampExtractor; - protected transient WaterMarkEventGenerator<I> waterMarkEventGenerator; + protected transient WaterMarkEventGenerator<Record<I>> waterMarkEventGenerator; - protected java.util.function.Function<Collection<I>, O> windowFunction; + protected java.util.function.Function<Collection<I>, O> bareWindowFunction; + protected WindowFunction<I, O> windowFunction; public void initialize(Context context) { this.windowConfig = this.getWindowConfigs(context); - this.windowFunction = intializeUserFunction(this.windowConfig); + initializeUserFunction(this.windowConfig); log.info("Window Config: {}", this.windowConfig); this.windowManager = this.getWindowManager(this.windowConfig, context); this.initialized = true; this.start(); } - private java.util.function.Function<Collection<I>, O> intializeUserFunction(WindowConfig windowConfig) { + private void initializeUserFunction(WindowConfig windowConfig) { String actualWindowFunctionClassName = windowConfig.getActualWindowFunctionClassName(); ClassLoader clsLoader = Thread.currentThread().getContextClassLoader(); Object userClassObject = Reflections.createInstance( @@ -76,10 +75,12 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { Class<?>[] typeArgs = TypeResolver.resolveRawArguments( java.util.function.Function.class, userClassObject.getClass()); if (typeArgs[0].equals(Collection.class)) { - return (java.util.function.Function) userClassObject; + bareWindowFunction = (java.util.function.Function) userClassObject; } else { throw new IllegalArgumentException("Window function must take a collection as input"); } + } else if (userClassObject instanceof WindowFunction) { + windowFunction = (WindowFunction) userClassObject; } else { throw new IllegalArgumentException("Window function does not implement the correct interface"); } @@ -97,10 +98,10 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { return windowConfig; } - private WindowManager<I> getWindowManager(WindowConfig windowConfig, Context context) { + private WindowManager<Record<I>> getWindowManager(WindowConfig windowConfig, Context context) { - WindowLifecycleListener<Event<I>> lifecycleListener = newWindowLifecycleListener(context); - WindowManager<I> manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>()); + WindowLifecycleListener<Event<Record<I>>> lifecycleListener = newWindowLifecycleListener(context); + WindowManager<Record<I>> manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>()); if (this.windowConfig.getTimestampExtractorClassName() != null) { this.timestampExtractor = getTimeStampExtractor(windowConfig); @@ -115,8 +116,8 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { } } - EvictionPolicy<I, ?> evictionPolicy = getEvictionPolicy(windowConfig); - TriggerPolicy<I, ?> triggerPolicy = getTriggerPolicy(windowConfig, manager, + EvictionPolicy<Record<I>, ?> evictionPolicy = getEvictionPolicy(windowConfig); + TriggerPolicy<Record<I>, ?> triggerPolicy = getTriggerPolicy(windowConfig, manager, evictionPolicy, context); manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); @@ -162,8 +163,8 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { return (TimestampExtractor<I>) result; } - private TriggerPolicy<I, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<I> manager, - EvictionPolicy<I, ?> evictionPolicy, Context context) { + private TriggerPolicy<Record<I>, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<Record<I>> manager, + EvictionPolicy<Record<I>, ?> evictionPolicy, Context context) { if (windowConfig.getSlidingIntervalCount() != null) { if (this.isEventTime()) { return new WatermarkCountTriggerPolicy<>( @@ -181,7 +182,7 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { } } - private EvictionPolicy<I, ?> getEvictionPolicy(WindowConfig windowConfig) { + private EvictionPolicy<Record<I>, ?> getEvictionPolicy(WindowConfig windowConfig) { if (windowConfig.getWindowLengthCount() != null) { if (this.isEventTime()) { return new WatermarkCountEvictionPolicy<>(windowConfig.getWindowLengthCount()); @@ -198,17 +199,17 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { } } - protected WindowLifecycleListener<Event<I>> newWindowLifecycleListener(Context context) { - return new WindowLifecycleListener<Event<I>>() { + protected WindowLifecycleListener<Event<Record<I>>> newWindowLifecycleListener(Context context) { + return new WindowLifecycleListener<Event<Record<I>>>() { @Override - public void onExpiry(List<Event<I>> events) { - for (Event<I> event : events) { + public void onExpiry(List<Event<Record<I>>> events) { + for (Event<Record<I>> event : events) { event.getRecord().ack(); } } @Override - public void onActivation(List<Event<I>> tuples, List<Event<I>> newTuples, List<Event<I>> + public void onActivation(List<Event<Record<I>>> tuples, List<Event<Record<I>>> newTuples, List<Event<Record<I>>> expiredTuples, Long referenceTime) { processWindow( context, @@ -220,7 +221,7 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { }; } - private void processWindow(Context context, List<I> tuples, List<I> newTuples, List<I> + private void processWindow(Context context, List<Record<I>> tuples, List<Record<I>> newTuples, List<Record<I>> expiredTuples, Long referenceTime) { O output = null; @@ -273,12 +274,12 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { initialize(context); } - Record<?> record = context.getCurrentRecord(); + Record<I> record = (Record<I>)context.getCurrentRecord(); if (isEventTime()) { - long ts = this.timestampExtractor.extractTimestamp(input); + long ts = this.timestampExtractor.extractTimestamp(record.getValue()); if (this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) { - this.windowManager.add(input, ts, record); + this.windowManager.add(record, ts, record); } else { if (this.windowConfig.getLateDataTopic() != null) { context.publish(this.windowConfig.getLateDataTopic(), input); @@ -290,12 +291,17 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { record.ack(); } } else { - this.windowManager.add(input, System.currentTimeMillis(), record); + this.windowManager.add(record, System.currentTimeMillis(), record); } return null; } - public O process(Window<I> inputWindow, WindowContext context) throws Exception { - return this.windowFunction.apply(inputWindow.get()); + public O process(Window<Record<I>> inputWindow, WindowContext context) throws Exception { + if (this.bareWindowFunction != null) { + Collection<I> newCollection = inputWindow.get().stream().map(Record::getValue).collect(Collectors.toList()); + return this.bareWindowFunction.apply(newCollection); + } else { + return this.windowFunction.process(inputWindow.get(), context); + } } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java index 21a78a2..88ecebe 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java @@ -24,9 +24,8 @@ import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.common.functions.WindowConfig; -import org.apache.pulsar.functions.utils.WindowConfigUtils; +import org.apache.pulsar.functions.api.WindowContext; import org.mockito.Mockito; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -51,10 +50,10 @@ public class WindowFunctionExecutorTest { private static class TestWindowFunctionExecutor extends WindowFunctionExecutor<Long, Long> { - List<Window<Long>> windows = new ArrayList<>(); + List<Window<Record<Long>>> windows = new ArrayList<>(); @Override - public Long process(Window<Long> inputWindow, WindowContext context) throws Exception { + public Long process(Window<Record<Long>> inputWindow, WindowContext context) throws Exception { windows.add(inputWindow); return null; } @@ -150,22 +149,26 @@ public class WindowFunctionExecutorTest { public void testExecuteWithTs() throws Exception { long[] timestamps = {603, 605, 607, 618, 626, 636}; for (long ts : timestamps) { + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); + Mockito.doReturn(ts).when(record).getValue(); testWindowedPulsarFunction.process(ts, context); } testWindowedPulsarFunction.waterMarkEventGenerator.run(); assertEquals(3, testWindowedPulsarFunction.windows.size()); - Window<Long> first = testWindowedPulsarFunction.windows.get(0); + Window<Record<Long>> first = testWindowedPulsarFunction.windows.get(0); assertArrayEquals( new long[]{603, 605, 607}, - new long[]{first.get().get(0), first.get().get(1), first.get().get(2)}); + new long[]{first.get().get(0).getValue(), first.get().get(1).getValue(), first.get().get(2).getValue()}); - Window<Long> second = testWindowedPulsarFunction.windows.get(1); + Window<Record<Long>> second = testWindowedPulsarFunction.windows.get(1); assertArrayEquals( new long[]{603, 605, 607, 618}, - new long[]{second.get().get(0), second.get().get(1), second.get().get(2), second.get().get(3)}); + new long[]{second.get().get(0).getValue(), second.get().get(1).getValue(), second.get().get(2).getValue(), second.get().get(3).getValue()}); - Window<Long> third = testWindowedPulsarFunction.windows.get(2); - assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0), third.get().get(1)}); + Window<Record<Long>> third = testWindowedPulsarFunction.windows.get(2); + assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0).getValue(), third.get().get(1).getValue()}); } @Test @@ -207,6 +210,10 @@ public class WindowFunctionExecutorTest { for (long ts : timestamps) { events.add(ts); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); + Mockito.doReturn(ts).when(record).getValue(); testWindowedPulsarFunction.process(ts, context); //Update the watermark to this timestamp diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java similarity index 93% copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java index ae01cec..d8f1864 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java @@ -27,7 +27,7 @@ import java.util.function.Function; * Example Function that acts on a window of tuples at a time rather than per tuple basis. */ @Slf4j -public class WindowFunction implements Function <Collection<Integer>, Integer> { +public class AddWindowFunction implements Function <Collection<Integer>, Integer> { @Override public Integer apply(Collection<Integer> integers) { return integers.stream().reduce(0, (x, y) -> x + y); diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java similarity index 68% rename from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java rename to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java index ae01cec..fe90d1e 100644 --- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java @@ -19,17 +19,23 @@ package org.apache.pulsar.functions.api.examples; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.api.WindowContext; +import org.apache.pulsar.functions.api.WindowFunction; import java.util.Collection; -import java.util.function.Function; /** * Example Function that acts on a window of tuples at a time rather than per tuple basis. */ @Slf4j -public class WindowFunction implements Function <Collection<Integer>, Integer> { +public class ContextWindowFunction implements WindowFunction<Integer, Integer> { @Override - public Integer apply(Collection<Integer> integers) { - return integers.stream().reduce(0, (x, y) -> x + y); + public Integer process(Collection<Record<Integer>> integers, WindowContext context) { + Integer retval = 0; + for (Record<Integer> record : integers) { + retval += record.getValue(); + } + return retval; } } diff --git a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml index 3dc3279..e0faf6f 100644 --- a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml +++ b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml @@ -20,7 +20,7 @@ tenant: "test" namespace: "test-namespace" name: "example" -className: "org.apache.pulsar.functions.api.examples.WindowFunction" +className: "org.apache.pulsar.functions.api.examples.AddWindowFunction" inputs: ["test_src"] userConfig: "PublishTopic": "test_result" diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java index b50bcd5..6f2e76b 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java @@ -36,6 +36,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.WindowFunction; import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; @@ -91,19 +92,28 @@ public class Utils { Class<?>[] typeArgs; // if window function if (isWindowConfigPresent) { - java.util.function.Function function = (java.util.function.Function) userClass; - if (function == null) { - throw new IllegalArgumentException( - String.format("The Java util function class %s could not be instantiated", userClass)); - } - typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); - if (!typeArgs[0].equals(Collection.class)) { - throw new IllegalArgumentException("Window function must take a collection as input"); + if (userClass instanceof WindowFunction) { + WindowFunction function = (WindowFunction) userClass; + if (function == null) { + throw new IllegalArgumentException( + String.format("The WindowFunction class %s could not be instantiated", userClass)); + } + typeArgs = TypeResolver.resolveRawArguments(WindowFunction.class, function.getClass()); + } else { + java.util.function.Function function = (java.util.function.Function) userClass; + if (function == null) { + throw new IllegalArgumentException( + String.format("The Java util function class %s could not be instantiated", userClass)); + } + typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass()); + if (!typeArgs[0].equals(Collection.class)) { + throw new IllegalArgumentException("Window function must take a collection as input"); + } + Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass()); + Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0]; + Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0]; + typeArgs[0] = (Class<?>) actualInputType; } - Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass()); - Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0]; - Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0]; - typeArgs[0] = (Class<?>) actualInputType; } else { if (userClass instanceof Function) { Function pulsarFunction = (Function) userClass;