SAMZA-1073: moving all operator classes into samza-core
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8515448a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8515448a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8515448a Branch: refs/heads/master Commit: 8515448a2023ae6c78b9b0bb8e297cf346775e13 Parents: daaad7b Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Thu Feb 16 15:04:01 2017 -0800 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Thu Feb 16 15:04:01 2017 -0800 ---------------------------------------------------------------------- build.gradle | 24 +- gradle/dependency-versions.gradle | 1 - .../samza/operators/MessageStreamImpl.java | 182 +++++++++++++ .../apache/samza/operators/StreamGraphImpl.java | 260 +++++++++++++++++++ .../functions/PartialJoinFunction.java | 56 ++++ .../samza/operators/impl/OperatorGraph.java | 164 ++++++++++++ .../samza/operators/impl/OperatorImpl.java | 67 +++++ .../operators/impl/PartialJoinOperatorImpl.java | 47 ++++ .../samza/operators/impl/RootOperatorImpl.java | 35 +++ .../impl/SessionWindowOperatorImpl.java | 52 ++++ .../samza/operators/impl/SinkOperatorImpl.java | 44 ++++ .../operators/impl/StreamOperatorImpl.java | 49 ++++ .../operators/impl/WindowOperatorImpl.java | 43 +++ .../samza/operators/spec/OperatorSpec.java | 62 +++++ .../samza/operators/spec/OperatorSpecs.java | 210 +++++++++++++++ .../operators/spec/PartialJoinOperatorSpec.java | 86 ++++++ .../samza/operators/spec/SinkOperatorSpec.java | 116 +++++++++ .../operators/spec/StreamOperatorSpec.java | 91 +++++++ .../operators/spec/WindowOperatorSpec.java | 72 +++++ .../samza/operators/spec/WindowState.java | 81 ++++++ .../system/RemoteExecutionEnvironment.java | 37 +++ .../system/StandaloneExecutionEnvironment.java | 50 ++++ .../apache/samza/task/StreamOperatorTask.java | 111 ++++++++ .../samza/example/KeyValueStoreExample.java | 180 +++++++++++++ .../samza/example/NoContextStreamExample.java | 151 +++++++++++ .../samza/example/OrderShipmentJoinExample.java | 188 ++++++++++++++ .../samza/example/PageViewCounterExample.java | 129 +++++++++ .../samza/example/RepartitionExample.java | 140 ++++++++++ .../samza/example/TestBasicStreamGraphs.java | 99 +++++++ .../samza/example/TestBroadcastExample.java | 113 ++++++++ .../apache/samza/example/TestExampleBase.java | 46 ++++ .../apache/samza/example/TestJoinExample.java | 129 +++++++++ .../apache/samza/example/TestWindowExample.java | 81 ++++++ .../samza/operators/TestMessageStreamImpl.java | 204 +++++++++++++++ .../operators/TestMessageStreamImplUtil.java | 26 ++ .../data/JsonIncomingSystemMessageEnvelope.java | 60 +++++ .../samza/operators/impl/TestOperatorImpl.java | 73 ++++++ .../samza/operators/impl/TestOperatorImpls.java | 235 +++++++++++++++++ .../operators/impl/TestSinkOperatorImpl.java | 50 ++++ .../operators/impl/TestStreamOperatorImpl.java | 60 +++++ .../samza/operators/spec/TestOperatorSpecs.java | 127 +++++++++ samza-operator/README.md | 17 -- .../samza/operators/MessageStreamImpl.java | 182 ------------- .../apache/samza/operators/StreamGraphImpl.java | 260 ------------------- .../functions/PartialJoinFunction.java | 56 ---- .../samza/operators/impl/OperatorGraph.java | 164 ------------ .../samza/operators/impl/OperatorImpl.java | 67 ----- .../operators/impl/PartialJoinOperatorImpl.java | 47 ---- .../samza/operators/impl/RootOperatorImpl.java | 35 --- .../impl/SessionWindowOperatorImpl.java | 52 ---- .../samza/operators/impl/SinkOperatorImpl.java | 44 ---- .../operators/impl/StreamOperatorImpl.java | 49 ---- .../operators/impl/WindowOperatorImpl.java | 43 --- .../samza/operators/spec/OperatorSpec.java | 62 ----- .../samza/operators/spec/OperatorSpecs.java | 210 --------------- .../operators/spec/PartialJoinOperatorSpec.java | 86 ------ .../samza/operators/spec/SinkOperatorSpec.java | 116 --------- .../operators/spec/StreamOperatorSpec.java | 91 ------- .../operators/spec/WindowOperatorSpec.java | 72 ----- .../samza/operators/spec/WindowState.java | 81 ------ .../system/RemoteExecutionEnvironment.java | 37 --- .../system/StandaloneExecutionEnvironment.java | 50 ---- .../apache/samza/task/StreamOperatorTask.java | 111 -------- .../samza/example/KeyValueStoreExample.java | 180 ------------- .../samza/example/NoContextStreamExample.java | 151 ----------- .../samza/example/OrderShipmentJoinExample.java | 188 -------------- .../samza/example/PageViewCounterExample.java | 129 --------- .../samza/example/RepartitionExample.java | 140 ---------- .../samza/example/TestBasicStreamGraphs.java | 99 ------- .../samza/example/TestBroadcastExample.java | 113 -------- .../apache/samza/example/TestExampleBase.java | 46 ---- .../apache/samza/example/TestJoinExample.java | 129 --------- .../apache/samza/example/TestWindowExample.java | 81 ------ .../samza/operators/TestMessageStreamImpl.java | 204 --------------- .../operators/TestMessageStreamImplUtil.java | 26 -- .../data/JsonIncomingSystemMessageEnvelope.java | 60 ----- .../samza/operators/impl/TestOperatorImpl.java | 73 ------ .../samza/operators/impl/TestOperatorImpls.java | 235 ----------------- .../operators/impl/TestSinkOperatorImpl.java | 50 ---- .../operators/impl/TestStreamOperatorImpl.java | 60 ----- .../samza/operators/spec/TestOperatorSpecs.java | 127 --------- settings.gradle | 1 - 82 files changed, 4007 insertions(+), 4048 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 0d60970..400a913 100644 --- a/build.gradle +++ b/build.gradle @@ -160,6 +160,7 @@ project(":samza-core_$scalaVersion") { compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" compile "com.101tec:zkclient:$zkClientVersion" + testCompile project(":samza-api").sourceSets.test.output testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" @@ -372,29 +373,6 @@ project(":samza-yarn_$scalaVersion") { jar.dependsOn("lesscss") } -project(":samza-operator") { - apply plugin: 'java' - apply plugin: 'checkstyle' - - sourceCompatibility = 1.8 - - dependencies { - compile project(':samza-api') - compile project(":samza-core_$scalaVersion") - // TODO: remove this dependency after refactoring operator implementation classes - compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion" - - testCompile project(":samza-api").sourceSets.test.output - testCompile "junit:junit:$junitVersion" - testCompile "org.mockito:mockito-all:$mockitoVersion" - } - - checkstyle { - configFile = new File(rootDir, "checkstyle/checkstyle.xml") - toolVersion = "$checkstyleVersion" - } -} - project(":samza-shell") { apply plugin: 'java' http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 0193b64..0a8542b 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -38,6 +38,5 @@ commonsCodecVersion = "1.9" commonsCollectionVersion = "3.2.1" httpClientVersion="4.4.1" - reactiveStreamVersion="1.0.0" commonsLang3Version="3.4" } http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java new file mode 100644 index 0000000..830e4a5 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; +import java.util.HashSet; +import java.util.Set; +import org.apache.samza.config.Config; +import org.apache.samza.operators.functions.FilterFunction; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.OperatorSpecs; +import org.apache.samza.operators.windows.Window; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.task.TaskContext; + + +/** + * The implementation for input/output {@link MessageStream}s to/from the operators. + * Users use the {@link MessageStream} API methods to describe and chain the operators specs. + * + * @param <M> type of messages in this {@link MessageStream} + */ +public class MessageStreamImpl<M> implements MessageStream<M> { + /** + * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl} + */ + private final StreamGraphImpl graph; + + /** + * The set of operators that consume the messages in this {@link MessageStream} + */ + private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>(); + + /** + * Default constructor + * + * @param graph the {@link StreamGraphImpl} object that this stream belongs to + */ + MessageStreamImpl(StreamGraphImpl graph) { + this.graph = graph; + } + + @Override public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) { + OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph)); + this.registeredOperatorSpecs.add(op); + return op.getNextStream(); + } + + @Override public MessageStream<M> filter(FilterFunction<M> filterFn) { + OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph)); + this.registeredOperatorSpecs.add(op); + return op.getNextStream(); + } + + @Override + public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) { + OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph)); + this.registeredOperatorSpecs.add(op); + return op.getNextStream(); + } + + @Override + public void sink(SinkFunction<M> sinkFn) { + this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph)); + } + + @Override public void sendTo(OutputStream<M> stream) { + this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream)); + } + + @Override + public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) { + OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, + this.graph, new MessageStreamImpl<>(this.graph)); + this.registeredOperatorSpecs.add(wndOp); + return wndOp.getNextStream(); + } + + @Override public <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn) { + MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph); + + PartialJoinFunction<K, M, OM, RM> parJoin1 = new PartialJoinFunction<K, M, OM, RM>() { + @Override + public RM apply(M m1, OM om) { + return joinFn.apply(m1, om); + } + + @Override + public K getKey(M message) { + return joinFn.getFirstKey(message); + } + + @Override + public K getOtherKey(OM message) { + return joinFn.getSecondKey(message); + } + + @Override + public void init(Config config, TaskContext context) { + joinFn.init(config, context); + } + }; + + PartialJoinFunction<K, OM, M, RM> parJoin2 = new PartialJoinFunction<K, OM, M, RM>() { + @Override + public RM apply(OM m1, M m) { + return joinFn.apply(m, m1); + } + + @Override + public K getKey(OM message) { + return joinFn.getSecondKey(message); + } + + @Override + public K getOtherKey(M message) { + return joinFn.getFirstKey(message); + } + }; + + // TODO: need to add default store functions for the two partial join functions + + ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add( + OperatorSpecs.<OM, K, M, RM>createPartialJoinOperatorSpec(parJoin2, this.graph, outputStream)); + this.registeredOperatorSpecs.add(OperatorSpecs.<M, K, OM, RM>createPartialJoinOperatorSpec(parJoin1, this.graph, outputStream)); + return outputStream; + } + + @Override + public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) { + MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph); + + otherStreams.add(this); + otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs. + add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream))); + return outputStream; + } + + @Override + public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) { + MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor); + OutputStream<M> outputStream = this.graph.getOutputStream(intStream); + this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(), + this.graph, outputStream)); + return intStream; + } + /** + * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and + * should not be exposed to users. + * + * @return a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}. + */ + public Collection<OperatorSpec> getRegisteredOperatorSpecs() { + return Collections.unmodifiableSet(this.registeredOperatorSpecs); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java new file mode 100644 index 0000000..dca3469 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import java.util.Properties; +import java.util.function.Function; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to + * create system input/output/intermediate streams. + */ +public class StreamGraphImpl implements StreamGraph { + + /** + * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope} + * in the input {@link MessageStream}s. + */ + private int opId = 0; + + private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> { + final StreamSpec spec; + final Serde<K> keySerde; + final Serde<V> msgSerde; + + InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + super(graph); + this.spec = streamSpec; + this.keySerde = keySerde; + this.msgSerde = msgSerde; + } + + StreamSpec getSpec() { + return this.spec; + } + + } + + private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> { + final StreamSpec spec; + final Serde<K> keySerde; + final Serde<V> msgSerde; + + OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + this.spec = streamSpec; + this.keySerde = keySerde; + this.msgSerde = msgSerde; + } + + StreamSpec getSpec() { + return this.spec; + } + + @Override + public SinkFunction<M> getSinkFunction() { + return (M message, MessageCollector mc, TaskCoordinator tc) -> { + // TODO: need to find a way to directly pass in the serde class names + // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), + // message.getKey(), message.getKey(), message.getMessage())); + mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); + }; + } + } + + private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> { + final Function<M, PK> parKeyFn; + + /** + * Default constructor + * + * @param graph the {@link StreamGraphImpl} object that this stream belongs to + */ + IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + this(graph, streamSpec, keySerde, msgSerde, null); + } + + IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) { + super(graph, streamSpec, keySerde, msgSerde); + this.parKeyFn = parKeyFn; + } + + @Override + public SinkFunction<M> getSinkFunction() { + return (M message, MessageCollector mc, TaskCoordinator tc) -> { + // TODO: need to find a way to directly pass in the serde class names + // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), + // message.getKey(), message.getKey(), message.getMessage())); + if (this.parKeyFn == null) { + mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); + } else { + // apply partition key function + mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage())); + } + }; + } + } + + /** + * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl} + */ + private final Map<SystemStream, MessageStream> inStreams = new HashMap<>(); + private final Map<SystemStream, OutputStream> outStreams = new HashMap<>(); + + private ContextManager contextManager = new ContextManager() { }; + + @Override + public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { + this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); + } + return this.inStreams.get(streamSpec.getSystemStream()); + } + + /** + * Helper method to be used by {@link MessageStreamImpl} class + * + * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as the output + * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream} + * @return the {@link MessageStreamImpl} object + */ + @Override + public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { + this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); + } + return this.outStreams.get(streamSpec.getSystemStream()); + } + + /** + * Helper method to be used by {@link MessageStreamImpl} class + * + * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream} + * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream} + * @return the {@link MessageStreamImpl} object + */ + @Override + public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { + this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde)); + } + IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream()); + if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { + this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); + } + return intStream; + } + + @Override public Map<StreamSpec, MessageStream> getInStreams() { + Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>(); + this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry)); + return Collections.unmodifiableMap(inStreamMap); + } + + @Override public Map<StreamSpec, OutputStream> getOutStreams() { + Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>(); + this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry)); + return Collections.unmodifiableMap(outStreamMap); + } + + @Override + public StreamGraph withContextManager(ContextManager manager) { + this.contextManager = manager; + return this; + } + + public int getNextOpId() { + return this.opId++; + } + + public ContextManager getContextManager() { + return this.contextManager; + } + + /** + * Helper method to be get the input stream via {@link SystemStream} + * + * @param systemStream the {@link SystemStream} + * @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream} + */ + public MessageStreamImpl getInputStream(SystemStream systemStream) { + if (this.inStreams.containsKey(systemStream)) { + return (MessageStreamImpl) this.inStreams.get(systemStream); + } + return null; + } + + <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) { + if (this.outStreams.containsValue(intStream)) { + return (OutputStream<M>) intStream; + } + return null; + } + + <M> MessageStream<M> getIntStream(OutputStream<M> outStream) { + if (this.inStreams.containsValue(outStream)) { + return (MessageStream<M>) outStream; + } + return null; + } + + /** + * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method. + * + * @param parKeyFn the function to extract the partition key from the input message + * @param <PK> the type of partition key + * @param <M> the type of input message + * @return the {@link OutputStream} object for the re-partitioned stream + */ + <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) { + // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec} + StreamSpec streamSpec = new StreamSpec() { + @Override + public SystemStream getSystemStream() { + // TODO: should auto-generate intermedaite stream name here + return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId)); + } + + @Override + public Properties getProperties() { + return null; + } + }; + + if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { + this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn)); + } + IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream()); + if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { + this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); + } + return intStream; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java new file mode 100644 index 0000000..809a70a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * This defines the interface function a two-way join functions that takes input messages from two input + * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output + */ +@InterfaceStability.Unstable +public interface PartialJoinFunction<K, M, OM, RM> extends InitableFunction { + + /** + * Method to perform join method on the two input messages + * + * @param m1 message from the first input stream + * @param om message from the second input stream + * @return the joined message in the output stream + */ + RM apply(M m1, OM om); + + /** + * Method to get the key from the input message + * + * @param message the input message from the first strean + * @return the join key in the {@code message} + */ + K getKey(M message); + + /** + * Method to get the key from the input message in the other stream + * + * @param message the input message from the other stream + * @return the join key in the {@code message} + */ + K getOtherKey(OM message); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java new file mode 100644 index 0000000..66336f8 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.TaskContext; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +/** + * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a + * {@link MessageStreamImpl} + */ +public class OperatorGraph { + + /** + * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG + * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created + * according to a single instance of {@link OperatorSpec}. + */ + private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>(); + + /** + * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages. + */ + private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>(); + + /** + * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}. + * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and + * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input. + * + * @param inputStreams the map of input {@link org.apache.samza.operators.MessageStream}s + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + */ + public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) { + inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context))); + } + + /** + * Method to get the corresponding {@link RootOperatorImpl} + * + * @param ss input {@link SystemStream} + * @param <M> the type of input message + * @return the {@link OperatorImpl} that starts processing the input message + */ + public <M> OperatorImpl<M, M> get(SystemStream ss) { + return this.operatorGraph.get(ss); + } + + /** + * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl}, + * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node. + * + * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for + * @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl} + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + * @return root node for the {@link OperatorImpl} DAG + */ + private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, + TaskContext context) { + // since the source message stream might have multiple operator specs registered on it, + // create a new root node as a single point of entry for the DAG. + RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>(); + // create the pipeline/topology starting from the source + source.getRegisteredOperatorSpecs().forEach(registeredOperator -> { + // pass in the source and context s.t. stateful stream operators can initialize their stores + OperatorImpl<M, ?> operatorImpl = + this.createAndRegisterOperatorImpl(registeredOperator, source, config, context); + rootOperator.registerNextOperator(operatorImpl); + }); + return rootOperator; + } + + /** + * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding + * {@link OperatorImpl}s. + * + * @param operatorSpec the operatorSpec registered with the {@code source} + * @param source the source {@link MessageStreamImpl} + * @param <M> type of input message + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + * @return the operator implementation for the operatorSpec + */ + private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec, + MessageStreamImpl<M> source, Config config, TaskContext context) { + if (!operators.containsKey(operatorSpec)) { + OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context); + if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) { + // this is the first time we've added the operatorImpl corresponding to the operatorSpec, + // so traverse and initialize and register the rest of the DAG. + // initialize the corresponding operator function + operatorSpec.init(config, context); + MessageStreamImpl nextStream = operatorSpec.getNextStream(); + if (nextStream != null) { + Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs(); + registeredSpecs.forEach(registeredSpec -> { + OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context); + operatorImpl.registerNextOperator(subImpl); + }); + } + return operatorImpl; + } + } + + // the implementation corresponding to operatorSpec has already been instantiated + // and registered, so we do not need to traverse the DAG further. + return operators.get(operatorSpec); + } + + /** + * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. + * + * @param source the source {@link MessageStreamImpl} + * @param <M> type of input message + * @param operatorSpec the immutable {@link OperatorSpec} definition. + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + * @return the {@link OperatorImpl} implementation instance + */ + private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) { + if (operatorSpec instanceof StreamOperatorSpec) { + StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec; + return new StreamOperatorImpl<>(streamOpSpec, source, config, context); + } else if (operatorSpec instanceof SinkOperatorSpec) { + return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context); + } else if (operatorSpec instanceof WindowOperatorSpec) { + return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context); + } else if (operatorSpec instanceof PartialJoinOperatorSpec) { + return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context); + } + throw new IllegalArgumentException( + String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java new file mode 100644 index 0000000..abb1fa9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +import java.util.HashSet; +import java.util.Set; + + +/** + * Abstract base class for all stream operator implementations. + */ +public abstract class OperatorImpl<M, RM> { + + private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>(); + + /** + * Register the next operator in the chain that this operator should propagate its output to. + * @param nextOperator the next operator in the chain. + */ + void registerNextOperator(OperatorImpl<RM, ?> nextOperator) { + nextOperators.add(nextOperator); + } + + /** + * Perform the transformation required for this operator and call the downstream operators. + * + * Must call {@link #propagateResult} to propage the output to registered downstream operators correctly. + * + * @param message the input message + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); + + /** + * Helper method to propagate the output of this operator to all registered downstream operators. + * + * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly. + * + * @param outputMessage output message + * @param collector the {@link MessageCollector} in the context + * @param coordinator the {@link TaskCoordinator} in the context + */ + void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { + nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java new file mode 100644 index 0000000..c8515e1 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation of a {@link PartialJoinOperatorSpec}. This class implements function + * that only takes in one input stream among all inputs to the join and generate the join output. + * + * @param <M> type of messages in the input stream + * @param <JM> type of messages in the stream to join with + * @param <RM> type of messages in the joined stream + */ +class PartialJoinOperatorImpl<M, K, JM, RM> extends OperatorImpl<M, RM> { + + PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp, MessageStreamImpl<M> source, Config config, TaskContext context) { + // TODO: implement PartialJoinOperatorImpl constructor + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + // TODO: implement PartialJoinOperatorImpl processing logic + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java new file mode 100644 index 0000000..4b30a5d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + + +/** + * A no-op operator implementation that forwards incoming messages to all of its subscribers. + * @param <M> type of incoming messages + */ +final class RootOperatorImpl<M> extends OperatorImpl<M, M> { + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.propagateResult(message, collector, coordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java new file mode 100644 index 0000000..2bb362c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Default implementation class of a {@link WindowOperatorSpec} for a session window. + * + * @param <M> the type of input message + * @param <RK> the type of window key + * @param <WV> the type of window state + */ +class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> { + + private final WindowOperatorSpec<M, RK, WV> windowSpec; + + SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { + this.windowSpec = windowSpec; + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + } + + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + // This is to periodically check the timeout triggers to get the list of window states to be updated + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java new file mode 100644 index 0000000..41d1778 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Implementation for {@link SinkOperatorSpec} + */ +class SinkOperatorImpl<M> extends OperatorImpl<M, M> { + + private final SinkFunction<M> sinkFn; + + SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) { + this.sinkFn = sinkOp.getSinkFn(); + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + this.sinkFn.apply(message, collector, coordinator); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java new file mode 100644 index 0000000..644de20 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message. + * + * @param <M> type of message in the input stream + * @param <RM> type of message in the output stream + */ +class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> { + + private final FlatMapFunction<M, RM> transformFn; + + StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { + this.transformFn = streamOperatorSpec.getTransformFn(); + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + // call the transform function and then for each output call propagateResult() + this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java new file mode 100644 index 0000000..af00553 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + +public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> { + + private final WindowInternal<M, WK, WV> window; + + public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) { + // source, config, and context are used to initialize the window kv-store + window = spec.getWindow(); + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java new file mode 100644 index 0000000..1444662 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.task.TaskContext; + + +/** + * A stateless serializable stream operator specification that holds all the information required + * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}. + * + * @param <OM> the type of output message from the operator + */ +@InterfaceStability.Unstable +public interface OperatorSpec<OM> { + + enum OpCode { + MAP, + FLAT_MAP, + FILTER, + SINK, + SEND_TO, + JOIN, + WINDOW, + MERGE, + PARTITION_BY + } + + + /** + * Get the output stream containing transformed messages produced by this operator. + * @return the output stream containing transformed messages produced by this operator. + */ + MessageStreamImpl<OM> getNextStream(); + + /** + * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP. + * + * @param config the {@link Config} object for this task + * @param context the {@link TaskContext} object for this task + */ + default void init(Config config, TaskContext context) { } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java new file mode 100644 index 0000000..d626852 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.spec; + +import java.util.Collection; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.functions.FilterFunction; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; + +import java.util.ArrayList; +import org.apache.samza.task.TaskContext; + + +/** + * Factory methods for creating {@link OperatorSpec} instances. + */ +public class OperatorSpecs { + + private OperatorSpecs() {} + + /** + * Creates a {@link StreamOperatorSpec} for {@link MapFunction} + * + * @param mapFn the map function + * @param graph the {@link StreamGraphImpl} object + * @param output the output {@link MessageStreamImpl} object + * @param <M> type of input message + * @param <OM> type of output message + * @return the {@link StreamOperatorSpec} + */ + public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) { + return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() { + @Override + public Collection<OM> apply(M message) { + return new ArrayList<OM>() { + { + OM r = mapFn.apply(message); + if (r != null) { + this.add(r); + } + } + }; + } + + @Override + public void init(Config config, TaskContext context) { + mapFn.init(config, context); + } + }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId()); + } + + /** + * Creates a {@link StreamOperatorSpec} for {@link FilterFunction} + * + * @param filterFn the transformation function + * @param graph the {@link StreamGraphImpl} object + * @param output the output {@link MessageStreamImpl} object + * @param <M> type of input message + * @return the {@link StreamOperatorSpec} + */ + public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) { + return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() { + @Override + public Collection<M> apply(M message) { + return new ArrayList<M>() { + { + if (filterFn.apply(message)) { + this.add(message); + } + } + }; + } + + @Override + public void init(Config config, TaskContext context) { + filterFn.init(config, context); + } + }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId()); + } + + /** + * Creates a {@link StreamOperatorSpec}. + * + * @param transformFn the transformation function + * @param graph the {@link StreamGraphImpl} object + * @param output the output {@link MessageStreamImpl} object + * @param <M> type of input message + * @param <OM> type of output message + * @return the {@link StreamOperatorSpec} + */ + public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec( + FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) { + return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId()); + } + + /** + * Creates a {@link SinkOperatorSpec}. + * + * @param sinkFn the sink function + * @param <M> type of input message + * @param graph the {@link StreamGraphImpl} object + * @return the {@link SinkOperatorSpec} + */ + public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) { + return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId()); + } + + /** + * Creates a {@link SinkOperatorSpec}. + * + * @param sinkFn the sink function + * @param graph the {@link StreamGraphImpl} object + * @param stream the {@link OutputStream} where the message is sent to + * @param <M> type of input message + * @return the {@link SinkOperatorSpec} + */ + public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) { + return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream); + } + + /** + * Creates a {@link SinkOperatorSpec}. + * + * @param sinkFn the sink function + * @param graph the {@link StreamGraphImpl} object + * @param stream the {@link OutputStream} where the message is sent to + * @param <M> type of input message + * @return the {@link SinkOperatorSpec} + */ + public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) { + return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream); + } + + /** + * Creates a {@link WindowOperatorSpec}. + * + * @param window the description of the window. + * @param graph the {@link StreamGraphImpl} object + * @param wndOutput the window output {@link MessageStreamImpl} object + * @param <M> the type of input message + * @param <WK> the type of key in the {@link WindowPane} + * @param <WV> the type of value in the window + * @return the {@link WindowOperatorSpec} + */ + + public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec( + WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) { + return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId()); + } + + /** + * Creates a {@link PartialJoinOperatorSpec}. + * + * @param partialJoinFn the join function + * @param graph the {@link StreamGraphImpl} object + * @param joinOutput the output {@link MessageStreamImpl} + * @param <M> type of input message + * @param <K> type of join key + * @param <JM> the type of message in the other join stream + * @param <OM> the type of message in the join output + * @return the {@link PartialJoinOperatorSpec} + */ + public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec( + PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, MessageStreamImpl<OM> joinOutput) { + return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, graph.getNextOpId()); + } + + /** + * Creates a {@link StreamOperatorSpec} with a merger function. + * + * @param graph the {@link StreamGraphImpl} object + * @param mergeOutput the output {@link MessageStreamImpl} from the merger + * @param <M> the type of input message + * @return the {@link StreamOperatorSpec} for the merge + */ + public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) { + return new StreamOperatorSpec<M, M>(message -> + new ArrayList<M>() { + { + this.add(message); + } + }, + mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java new file mode 100644 index 0000000..e057c2b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.task.TaskContext; + + +/** + * Spec for the partial join operator that takes messages from one input stream, joins with buffered + * messages from another stream, and produces join results to an output {@link MessageStreamImpl}. + * + * @param <M> the type of input message + * @param <K> the type of join key + * @param <JM> the type of message in the other join stream + * @param <RM> the type of message in the join output stream + */ +public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> { + + private final MessageStreamImpl<RM> joinOutput; + + /** + * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of + * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream, + * and generates a joined result message of type {@code RM}. + */ + private final PartialJoinFunction<K, M, JM, RM> transformFn; + + + /** + * The unique ID for this operator. + */ + private final int opId; + + /** + * Default constructor for a {@link PartialJoinOperatorSpec}. + * + * @param partialJoinFn partial join function that take type {@code M} of input message and join + * w/ type {@code JM} of buffered message from another stream + * @param joinOutput the output {@link MessageStreamImpl} of the join results + */ + PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) { + this.joinOutput = joinOutput; + this.transformFn = partialJoinFn; + this.opId = opId; + } + + @Override + public MessageStreamImpl<RM> getNextStream() { + return this.joinOutput; + } + + public PartialJoinFunction<K, M, JM, RM> getTransformFn() { + return this.transformFn; + } + + public OperatorSpec.OpCode getOpCode() { + return OpCode.JOIN; + } + + public int getOpId() { + return this.opId; + } + + @Override public void init(Config config, TaskContext context) { + this.transformFn.init(config, context); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java new file mode 100644 index 0000000..ba30d67 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.task.TaskContext; + + +/** + * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external + * system. This is a terminal operator and does allows further operator chaining. + * + * @param <M> the type of input message + */ +public class SinkOperatorSpec<M> implements OperatorSpec { + + /** + * {@link OpCode} for this {@link SinkOperatorSpec} + */ + private final OperatorSpec.OpCode opCode; + + /** + * The unique ID for this operator. + */ + private final int opId; + + /** + * The user-defined sink function + */ + private final SinkFunction<M> sinkFn; + + /** + * Potential output stream defined by the {@link SinkFunction} + */ + private final OutputStream<M> outStream; + + /** + * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database) + * + * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message, + * the output {@link org.apache.samza.task.MessageCollector} and the + * {@link org.apache.samza.task.TaskCoordinator}. + * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, + * or {@link OpCode#PARTITION_BY} + * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph} + */ + SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) { + this(sinkFn, opCode, opId, null); + } + + /** + * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream} + * + * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message, + * the output {@link org.apache.samza.task.MessageCollector} and the + * {@link org.apache.samza.task.TaskCoordinator}. + * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, + * or {@link OpCode#PARTITION_BY} + * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph} + * @param opId the {@link OutputStream} for this {@link SinkOperatorSpec} + */ + SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) { + this.sinkFn = sinkFn; + this.opCode = opCode; + this.opId = opId; + this.outStream = outStream; + } + + /** + * This is a terminal operator and doesn't allow further operator chaining. + * @return null + */ + @Override + public MessageStreamImpl<M> getNextStream() { + return null; + } + + public SinkFunction<M> getSinkFn() { + return this.sinkFn; + } + + public OperatorSpec.OpCode getOpCode() { + return this.opCode; + } + + public int getOpId() { + return this.opId; + } + + public OutputStream<M> getOutStream() { + return this.outStream; + } + + @Override public void init(Config config, TaskContext context) { + this.sinkFn.init(config, context); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java new file mode 100644 index 0000000..d7813f7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.task.TaskContext; + + +/** + * The spec for a linear stream operator that outputs 0 or more messages for each input message. + * + * @param <M> the type of input message + * @param <OM> the type of output message + */ +public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { + + /** + * {@link OpCode} for this {@link StreamOperatorSpec} + */ + private final OperatorSpec.OpCode opCode; + + /** + * The unique ID for this operator. + */ + private final int opId; + + /** + * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec} + */ + private final MessageStreamImpl<OM> outputStream; + + /** + * Transformation function applied in this {@link StreamOperatorSpec} + */ + private final FlatMapFunction<M, OM> transformFn; + + /** + * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}. + * + * @param transformFn the transformation function + * @param outputStream the output {@link MessageStreamImpl} + * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec} + * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph} + */ + StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) { + this.outputStream = outputStream; + this.transformFn = transformFn; + this.opCode = opCode; + this.opId = opId; + } + + @Override + public MessageStreamImpl<OM> getNextStream() { + return this.outputStream; + } + + public FlatMapFunction<M, OM> getTransformFn() { + return this.transformFn; + } + + public OperatorSpec.OpCode getOpCode() { + return this.opCode; + } + + public int getOpId() { + return this.opId; + } + + @Override + public void init(Config config, TaskContext context) { + this.transformFn.init(config, context); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java new file mode 100644 index 0000000..46417ed --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; + + +/** + * Default window operator spec object + * + * @param <M> the type of input message to the window + * @param <WK> the type of key of the window + * @param <WV> the type of aggregated value in the window output {@link WindowPane} + */ +public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> { + + private final WindowInternal<M, WK, WV> window; + + private final MessageStreamImpl<WindowPane<WK, WV>> outputStream; + + private final int opId; + + + /** + * Constructor for {@link WindowOperatorSpec}. + * + * @param window the window function + * @param outputStream the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec} + * @param opId auto-generated unique ID of this operator + */ + WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) { + this.outputStream = outputStream; + this.window = window; + this.opId = opId; + } + + @Override + public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() { + return this.outputStream; + } + + public WindowInternal getWindow() { + return window; + } + + public OpCode getOpCode() { + return OpCode.WINDOW; + } + + public int getOpId() { + return this.opId; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java new file mode 100644 index 0000000..53bca2e --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowState.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * This interface defines the methods a window state class has to implement. The programmers are allowed to implement + * customized window state to be stored in window state stores by implementing this interface class. + * + * @param <WV> the type for window output value + */ +@InterfaceStability.Unstable +public interface WindowState<WV> { + /** + * Method to get the system time when the first message in the window is received + * + * @return nano-second of system time for the first message received in the window + */ + long getFirstMessageTimeNs(); + + /** + * Method to get the system time when the last message in the window is received + * + * @return nano-second of system time for the last message received in the window + */ + long getLastMessageTimeNs(); + + /** + * Method to get the earliest event time in the window + * + * @return the earliest event time in nano-second in the window + */ + long getEarliestEventTimeNs(); + + /** + * Method to get the latest event time in the window + * + * @return the latest event time in nano-second in the window + */ + long getLatestEventTimeNs(); + + /** + * Method to get the total number of messages received in the window + * + * @return number of messages in the window + */ + long getNumberMessages(); + + /** + * Method to get the corresponding window's output value + * + * @return the corresponding window's output value + */ + WV getOutputValue(); + + /** + * Method to set the corresponding window's output value + * + * @param value the corresponding window's output value + */ + void setOutputValue(WV value); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java new file mode 100644 index 0000000..fafa2cb --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.config.Config; + +/** + * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment + */ +public class RemoteExecutionEnvironment implements ExecutionEnvironment { + + @Override public void run(StreamGraphBuilder app, Config config) { + // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph} + // TODO: actually instantiate the tasks and run the job, i.e. + // 1. create all input/output/intermediate topics + // 2. create the single job configuration + // 3. execute JobRunner to submit the single job for the whole graph + } + +}