[FLINK-3216] [FLINK-3217] [cep] Adds CEP operator for pattern recognition Implements NFA using the SharedBuffer
Implements NFACompiler to compile a Pattern into a NFA Add CEP operator Makes NFA and SharedBuffer serializable Add serializability support to SharedBuffer and NFA Add keyed cep pattern operator Adds CEP documentation Adds online documentation for the CEP library Copies sequence events before giving them to the UDF Fix correct scala type suffixes This closes #1557. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79058edb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79058edb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79058edb Branch: refs/heads/master Commit: 79058edb67095120558add534ba37304425fa602 Parents: 682d8d5 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Jan 14 10:04:23 2016 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Feb 2 15:04:08 2016 +0100 ---------------------------------------------------------------------- docs/libs/cep/index.md | 300 +++++++ flink-libraries/flink-cep/pom.xml | 77 ++ .../src/main/java/org/apache/flink/cep/CEP.java | 100 +++ .../flink/cep/NonDuplicatingTypeSerializer.java | 195 +++++ .../flink/cep/PatternFlatSelectFunction.java | 54 ++ .../apache/flink/cep/PatternSelectFunction.java | 54 ++ .../org/apache/flink/cep/PatternStream.java | 151 ++++ .../apache/flink/cep/nfa/ComputationState.java | 84 ++ .../org/apache/flink/cep/nfa/DeweyNumber.java | 163 ++++ .../main/java/org/apache/flink/cep/nfa/NFA.java | 406 +++++++++ .../org/apache/flink/cep/nfa/SharedBuffer.java | 858 +++++++++++++++++++ .../java/org/apache/flink/cep/nfa/State.java | 109 +++ .../apache/flink/cep/nfa/StateTransition.java | 84 ++ .../flink/cep/nfa/StateTransitionAction.java | 28 + .../flink/cep/nfa/compiler/NFACompiler.java | 187 ++++ .../operator/AbstractCEPPatternOperator.java | 108 +++ .../flink/cep/operator/CEPPatternOperator.java | 137 +++ .../cep/operator/KeyedCEPPatternOperator.java | 331 +++++++ .../cep/operator/StreamRecordComparator.java | 44 + .../flink/cep/pattern/AndFilterFunction.java | 44 + .../flink/cep/pattern/FollowedByPattern.java | 33 + .../org/apache/flink/cep/pattern/Pattern.java | 168 ++++ .../cep/pattern/SubtypeFilterFunction.java | 43 + .../java/org/apache/flink/cep/CEPITCase.java | 406 +++++++++ .../test/java/org/apache/flink/cep/Event.java | 77 ++ .../java/org/apache/flink/cep/StreamEvent.java | 41 + .../java/org/apache/flink/cep/SubEvent.java | 49 ++ .../apache/flink/cep/nfa/DeweyNumberTest.java | 54 ++ .../org/apache/flink/cep/nfa/NFAITCase.java | 160 ++++ .../java/org/apache/flink/cep/nfa/NFATest.java | 261 ++++++ .../apache/flink/cep/nfa/SharedBufferTest.java | 136 +++ .../flink/cep/nfa/compiler/NFACompilerTest.java | 129 +++ .../apache/flink/cep/pattern/PatternTest.java | 145 ++++ flink-libraries/pom.xml | 1 + .../streaming/api/operators/StreamOperator.java | 2 +- 35 files changed, 5218 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/docs/libs/cep/index.md ---------------------------------------------------------------------- diff --git a/docs/libs/cep/index.md b/docs/libs/cep/index.md new file mode 100644 index 0000000..04e2b73 --- /dev/null +++ b/docs/libs/cep/index.md @@ -0,0 +1,300 @@ +--- +title: "FlinkCEP - Complex event processing for Flink" +# Top navigation +top-nav-group: libs +top-nav-pos: 2 +top-nav-title: CEP +# Sub navigation +sub-nav-group: batch +sub-nav-id: flinkcep +sub-nav-pos: 2 +sub-nav-parent: libs +sub-nav-title: CEP +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +FlinkCEP is the complex event processing library for Flink. +It allows you to easily detect complex event patterns in a stream of endless data. +Complex events can then be constructed from matching sequences. +This gives you the opportunity to quickly get hold of what's really important in your data. + +## Getting Started + +If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/apis/batch/index.html#linking-with-flink). +Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project. + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cep{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} + +Note that FlinkCEP is currently not part of the binary distribution. +See linking with it for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + +Now you can start writing your first CEP program using the pattern API. + +{% highlight java %} +DataStream<Event> input = ... + +Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42) + .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0) + .followedBy("end").where(evt -> evt.getName().equals("end")); + +PatternStream<Event> patternStream = CEP.from(input, pattern); + +DataStream<Alert> result = patternStream.select(pattern -> { + return createAlertFrom(pattern); +}); +{% endhighlight %} + +Note that we have used Java 8 lambdas here to make the example more succinct. + +## The Pattern API + +The pattern API allows you to quickly define complex event patterns. + +Each pattern consists of multiple stages or what we call states. +In order to go from one state to the next, the user can specify conditions. +These conditions can be the contiguity of events or a filter condition on an event. + +Each pattern has to start with an initial state: + +{% highlight java %} +Pattern<Event, ?> start = Pattern.<Event>begin("start"); +{% endhighlight %} + +Each state must have an unique name to identify the matched events later on. +Additionally, we can specify a filter condition for the event to be accepted as the start event via the `where` method. + +{% highlight java %} +start.where(new FilterFunction<Event>() { + @Override + public boolean filter(Event value) { + return ... // some condition + } +}); +{% endhighlight %} + +We can also restrict the type of the accepted event to some subtype of the initial event type (here `Event`) via the `subtype` method. + +{% highlight java %} +start.subtype(SubEvent.class).where(new FilterFunction<SubEvent>() { + @Override + public boolean filter(SubEvent value) { + return ... // some condition + } +}); +{% endhighlight %} + +As it can be seen here, the subtype condition can also be combined with an additional filter condition on the subtype. +In fact you can always provide multiple conditions by calling `where` and `subtype` multiple times. +These conditions will then be combined using the logical AND operator. + +Next, we can append further states to detect complex patterns. +We can control the contiguity of two succeeding events to be accepted by the pattern. + +Strict contiguity means that two matching events have to succeed directly. +This means that no other events can occur in between. +A strict contiguity pattern state can be created via the `next` method. + +{% highlight java %} +Pattern<Event, ?> strictNext = start.next("middle"); +{% endhighlight %} + +Non-strict contiguity means that other events are allowed to occur in-between two matching events. +A non-strict contiguity pattern state can be created via the `followedBy` method. + +It is also possible to define a temporal constraint for the pattern to be valid. +For example, one can define that a pattern should occur within 10 seconds via the `within` method. + +{% highlight java %} +next.within(Time.seconds(10)); +{% endhighlight %} + +{% highlight java %} +Pattern<Event, ?> nonStrictNext = start.followedBy("middle"); +{% endhighlight %} + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Pattern Operation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Begin</strong></td> + <td> + <p>Defines a starting pattern state:</p> + {% highlight java %} + Pattern<Event, ?> start = Pattern.<Event>begin("start"); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Next</strong></td> + <td> + <p>Appends a new pattern state. A matching event has to directly succeed the previous matching event:</p> +{% highlight java %} +Pattern<Event, ?> next = start.next("next"); +{% endhighlight %} + </td> + </tr> + <tr> + <td><strong>FollowedBy</strong></td> + <td> + <p>Appends a new pattern state. Other events can occur between a matching event and the previous matching event:</p> +{% highlight java %} +Pattern<Event, ?> next = start.followedBy("next"); +{% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Where</strong></td> + <td> + <p>Defines a filter condition for the current pattern state. Only if an event passes the filter, it can match the state:</p> +{% highlight java %} +patternState.where(new FilterFunction<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return ... // some condition + } +}); +{% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Subtype</strong></td> + <td> + <p>Defines a subtype condition for the current pattern state. Only if an event is of this subtype, it can match the state:</p> +{% highlight java %} +patternState.subtype(SubEvent.class); +{% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Within</strong></td> + <td> + <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event sequence exceeds this time, it is discarded:</p> +{% highlight java %} +patternState.within(Time.seconds(10)); +{% endhighlight %} + </td> + </tr> + </tbody> +</table> + +### Detecting Patterns + +In order to run a stream of events against your pattern, you have to create a `PatternStream`. +Given an input stream `input` and a pattern `pattern`, you create the `PatternStream` by calling + +{% highlight java %} +DataStream<Event> input = ... +Pattern<Event, ?> pattern = ... + +PatternStream<Event> patternStream = CEP.from(input, pattern); +{% endhighlight %} + +### Selecting from Patterns + +Once you have obtained a `PatternStream` you can select from detected event sequences via the `select` or `flatSelect` methods. +The `select` method requires a `PatternSelectFunction` implementation. +A `PatternSelectFunction` has a `select` method which is called for each matching event sequence. +It receives a map of string/event pairs of the matched events. +The string is defined by the name of the state to which the event has been matched. +The `select` method can return exactly one result. + +{% highlight java %} +class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> { + @Override + public OUT select(Map<String, IN> pattern) { + IN startEvent = pattern.get("start"); + IN endEvent = pattern.get("end"); + + return new OUT(startEvent, endEvent); + } +} +{% endhighlight %} + +A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an arbitrary number of results. +In order to do this, the `select` method has an additional `Collector` parameter which is used for the element output. + +{% highlight java %} +class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> { + @Override + public void select(Map<String, IN> pattern, Collector<OUT> collector) { + IN startEvent = pattern.get("start"); + IN endEvent = pattern.get("end"); + + for (int i = 0; i < startEvent.getValue(); i++ ) { + collector.collect(new OUT(startEvent, endEvent)); + } + } +} +{% endhighlight %} + +## Examples + +The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`. +The events are keyed by their ids and a valid pattern has to occur within 10 seconds. +The whole processing is done with event time. + +{% highlight java %} +StreamExecutionEnvironment env = ... +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + +DataStream<Event> input = ... + +DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() { + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } +}); + +Pattern<Event, ?> pattern = Pattern.<Event>begin("start") + .next("middle").where(new FilterFunction<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("name"); + } + }).followedBy("end").where(new FilterFunction<Event>() { + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("critical"); + } + }).within(Time.seconds(10)); + +PatternStream<Event> patternStream = CEP.from(partitionedInput, pattern); + +DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() { + @Override + public Alert select(Map<String, Event> pattern) throws Exception { + return new Alert(pattern.get("start"), pattern.get("end")) + } +}); +{% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml new file mode 100644 index 0000000..c246077 --- /dev/null +++ b/flink-libraries/flink-cep/pom.xml @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-libraries</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-cep</artifactId> + <name>flink-cep</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <packaging>jar</packaging> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java new file mode 100644 index 0000000..60e0bf8 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.cep.nfa.State; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.operator.CEPPatternOperator; +import org.apache.flink.cep.operator.KeyedCEPPatternOperator; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; + +import java.util.Map; + +/** + * Utility class for complex event processing. + * + * Methods which transform a {@link DataStream} into a {@link PatternStream} to do CEP. + */ +public class CEP { + private static final String PATTERN_OPERATOR_NAME = "AbstractCEPPatternOperator"; + + /** + * Transforms a {@link DataStream<T>} into a {@link PatternStream<T>}. The PatternStream detects + * the provided event pattern and emits the patterns as a {@link Map<String, T>} where each event + * is identified by a String. The String is the name of the {@link State <T>} to which the event + * has been associated. + * + * Depending on the input {@link DataStream<T>} type, keyed vs. non-keyed, a different + * {@link org.apache.flink.cep.operator.AbstractCEPPatternOperator<T>} is instantiated. + * + * @param input DataStream containing the input events + * @param pattern Pattern specification which shall be detected + * @param <T> Type of the input events + * @param <K> Type of the key in case of a KeyedStream (necessary to bind keySelector and + * keySerializer to the same type) + * @return Resulting pattern stream + */ + public static <T, K> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) { + final TypeSerializer<T> inputSerializer = input.getType().createSerializer(input.getExecutionConfig()); + + // check whether we use processing time + final boolean isProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + + // compile our pattern into a NFAFactory to instantiate NFAs later on + final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer); + + final DataStream<Map<String, T>> patternStream; + + if (input instanceof KeyedStream) { + // We have to use the KeyedCEPPatternOperator which can deal with keyed input streams + KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) input; + + KeySelector<T, K> keySelector = keyedStream.getKeySelector(); + TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig()); + + patternStream = keyedStream.transform( + PATTERN_OPERATOR_NAME, + (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class), + new KeyedCEPPatternOperator<>( + inputSerializer, + isProcessingTime, + keySelector, + keySerializer, + nfaFactory)); + } else { + patternStream = input.transform( + PATTERN_OPERATOR_NAME, + (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class), + new CEPPatternOperator<T>( + inputSerializer, + isProcessingTime, + nfaFactory + )).setParallelism(1); + } + + return new PatternStream<>(patternStream, input.getType()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java new file mode 100644 index 0000000..846b6c3 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.ArrayList; +import java.util.IdentityHashMap; + +/** + * Type serializer which keeps track of the serialized objects so that each object is only + * serialized once. If the same object shall be serialized again, then a reference handle is + * written instead. + * + * Avoiding duplication is achieved by keeping an internal identity hash map. This map contains + * all serialized objects. To make the serializer work it is important that the same serializer + * is used for a coherent serialization run. After the serialization has stopped, the identity + * hash map should be cleared. + * + * @param <T> Type of the element to be serialized + */ +public class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> { + private static final long serialVersionUID = -7633631762221447524L; + + // underlying type serializer + private final TypeSerializer<T> typeSerializer; + + // here we store the already serialized objects + private transient IdentityHashMap<T, Integer> identityMap; + + // here we store the already deserialized objects + private transient ArrayList<T> elementList; + + public NonDuplicatingTypeSerializer(final TypeSerializer<T> typeSerializer) { + this.typeSerializer = typeSerializer; + + this.identityMap = new IdentityHashMap<>(); + this.elementList = new ArrayList<>(); + } + + public TypeSerializer<T> getTypeSerializer() { + return typeSerializer; + } + + /** + * Clears the data structures containing the already serialized/deserialized objects. This + * effectively resets the type serializer. + */ + public void clearReferences() { + identityMap.clear(); + elementList.clear(); + } + + @Override + public boolean isImmutableType() { + return typeSerializer.isImmutableType(); + } + + @Override + public TypeSerializer<T> duplicate() { + return new NonDuplicatingTypeSerializer<>(typeSerializer); + } + + @Override + public T createInstance() { + return typeSerializer.createInstance(); + } + + @Override + public T copy(T from) { + return typeSerializer.copy(from); + } + + @Override + public T copy(T from, T reuse) { + return typeSerializer.copy(from, reuse); + } + + @Override + public int getLength() { + return typeSerializer.getLength(); + } + + /** + * Serializes the given record. + * <p> + * First a boolean indicating whether a reference handle (true) or the object (false) is + * written. Then, either the reference handle or the object is written. + * + * @param record The record to serialize. + * @param target The output view to write the serialized data to. + * + * @throws IOException + */ + public void serialize(T record, DataOutputView target) throws IOException { + if (identityMap.containsKey(record)) { + target.writeBoolean(true); + target.writeInt(identityMap.get(record)); + } else { + target.writeBoolean(false); + typeSerializer.serialize(record, target); + } + } + + /** + * Deserializes an object from the input view. + * <p> + * First it reads a boolean indicating whether a reference handle or a serialized object + * follows. + * + * @param source The input view from which to read the data. + * @return The deserialized object + * @throws IOException + */ + public T deserialize(DataInputView source) throws IOException { + boolean alreadyRead = source.readBoolean(); + + if (alreadyRead) { + int index = source.readInt(); + return elementList.get(index); + } else { + T element = typeSerializer.deserialize(source); + elementList.add(element); + + return element; + } + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + boolean alreadyRead = source.readBoolean(); + + if (alreadyRead) { + int index = source.readInt(); + typeSerializer.serialize(elementList.get(index), target); + } else { + T element = typeSerializer.deserialize(source); + typeSerializer.serialize(element, target); + } + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof NonDuplicatingTypeSerializer) { + @SuppressWarnings("unchecked") + NonDuplicatingTypeSerializer<T> other = (NonDuplicatingTypeSerializer<T>)obj; + + return (other.canEqual(this) && typeSerializer.equals(other.typeSerializer)); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof NonDuplicatingTypeSerializer; + } + + @Override + public int hashCode() { + return typeSerializer.hashCode(); + } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + ois.defaultReadObject(); + + this.identityMap = new IdentityHashMap<>(); + this.elementList = new ArrayList<>(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java new file mode 100644 index 0000000..bfbbc23 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatSelectFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; + +import java.io.Serializable; +import java.util.Map; + +/** + * Base interface for a pattern select function which can produce multiple resulting elements. A + * pattern flat select function is called with a map of detected events which are identified by + * their names. The names are defined by the {@link org.apache.flink.cep.pattern.Pattern} specifying + * the sought-after pattern. Additionally, a collector is provided as a parameter. The collector + * is used to emit an arbitrary number of resulting elements. + * + * <pre>{@code + * PatternStream<IN> pattern = ... + * + * DataStream<OUT> result = pattern.flatSelect(new MyPatternFlatSelectFunction()); + * }</pre> + * @param <IN> + * @param <OUT> + */ +public interface PatternFlatSelectFunction<IN, OUT> extends Function, Serializable { + + /** + * Generates zero or more resulting elements given a map of detected pattern events. The events + * are identified by their specified names. + * + * @param pattern Map containing the found pattern. Events are identified by their names. + * @param out Collector used to output the generated elements + * @throws Exception This method may throw exceptions. Throwing an exception will cause the + * operation to fail and may trigger recovery. + */ + void flatSelect(Map<String, IN> pattern, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java new file mode 100644 index 0000000..c403529 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternSelectFunction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; +import java.util.Map; + +/** + * Base interface for a pattern select function. A pattern select function is called with a + * map containing the detected events which can be accessed by their names. The names depend on + * the definition of the {@link org.apache.flink.cep.pattern.Pattern}. The select method returns + * exactly one result. If you want to return more than one result, then you have to implement + * a {@link PatternFlatSelectFunction}. + * + * <pre>{@code + * PatternStream<IN> pattern = ...; + * + * DataStream<OUT> result = pattern.select(new MyPatternSelectFunction()); + *}</pre> + * + * @param <IN> Type of the input elements + * @param <OUT> Type of the output element + */ +public interface PatternSelectFunction<IN, OUT> extends Function, Serializable { + + /** + * Generates a result from the given map of events. The events are identified by their names. + * Only one resulting element can be generated. + * + * @param pattern Map containing the found pattern. Events are identified by their names + * @return Resulting element + * @throws Exception This method may throw exceptions. Throwing an exception will cause the + * operation to fail and may trigger recovery. + */ + OUT select(Map<String, IN> pattern) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java new file mode 100644 index 0000000..63ed3b4 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.util.Collector; + +import java.util.Map; + +/** + * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected + * pattern sequences as a map of events associated with their names. The pattern is detected using a + * {@link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user + * has to specify a {@link PatternSelectFunction} or a {@link PatternFlatSelectFunction}. + * + * @param <T> Type of the events + */ +public class PatternStream<T> { + + // underlying data stream + private final DataStream<Map<String, T>> patternStream; + // type information of input type T + private final TypeInformation<T> inputType; + + PatternStream(final DataStream<Map<String, T>> patternStream, final TypeInformation<T> inputType) { + this.patternStream = patternStream; + this.inputType = inputType; + } + + /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * @param patternSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @param <R> Type of the resulting elements + * @return {@link DataStream} which contains the resulting elements from the pattern select + * function. + */ + public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) { + // we have to extract the output type from the provided pattern selection function manually + // because the TypeExtractor cannot do that if the method is wrapped in a MapFunction + TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + patternSelectFunction, + PatternSelectFunction.class, + false, + false, + inputType, + null, + false); + + return patternStream.map( + new PatternSelectMapper<T, R>( + patternStream.getExecutionEnvironment().clean(patternSelectFunction))) + .returns(outTypeInfo); + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence + * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function + * can produce an arbitrary number of resulting elements. + * + * @param patternFlatSelectFunction The pattern flat select function which is called for each + * detected pattern sequence. + * @param <R> Typ of the resulting elements + * @return {@link DataStream} which contains the resulting elements from the pattern flat select + * function. + */ + public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { + // we have to extract the output type from the provided pattern selection function manually + // because the TypeExtractor cannot do that if the method is wrapped in a MapFunction + TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + patternFlatSelectFunction, + PatternFlatSelectFunction.class, + false, + false, + inputType, + null, + false); + + return patternStream.flatMap( + new PatternFlatSelectMapper<T, R>( + patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) + )).returns(outTypeInfo); + } + + /** + * Wrapper for a {@link PatternSelectFunction}. + * + * @param <T> Type of the input elements + * @param <R> Type of the resulting elements + */ + private static class PatternSelectMapper<T, R> implements MapFunction<Map<String, T>, R> { + private static final long serialVersionUID = 2273300432692943064L; + + private final PatternSelectFunction<T, R> patternSelectFunction; + + public PatternSelectMapper(PatternSelectFunction<T, R> patternSelectFunction) { + this.patternSelectFunction = patternSelectFunction; + } + + @Override + public R map(Map<String, T> value) throws Exception { + return patternSelectFunction.select(value); + } + } + + /** + * Wrapper for a {@link PatternFlatSelectFunction}. + * + * @param <T> Type of the input elements + * @param <R> Type of the resulting elements + */ + private static class PatternFlatSelectMapper<T, R> implements FlatMapFunction<Map<String, T>, R> { + + private static final long serialVersionUID = -8610796233077989108L; + + private final PatternFlatSelectFunction<T, R> patternFlatSelectFunction; + + public PatternFlatSelectMapper(PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { + this.patternFlatSelectFunction = patternFlatSelectFunction; + } + + + @Override + public void flatMap(Map<String, T> value, Collector<R> out) throws Exception { + patternFlatSelectFunction.flatSelect(value, out); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java new file mode 100644 index 0000000..3f44fba --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +/** + * Helper class which encapsulates the state of the NFA computation. It points to the current state, + * the last taken event, its occurrence timestamp, the current version and the starting timestamp + * of the overall pattern. + * + * @param <T> Type of the input events + */ +public class ComputationState<T> { + // pointer to the NFA state of the computation + private final State<T> state; + + // the last taken event + private final T event; + + // timestamp of the last taken event + private final long timestamp; + + // The current version of the state to discriminate the valid pattern paths in the SharedBuffer + private final DeweyNumber version; + + // Timestamp of the first element in the pattern + private final long startTimestamp; + + public ComputationState( + final State<T> currentState, + final T event, + final long timestamp, + final DeweyNumber version, + final long startTimestamp) { + this.state = currentState; + this.event = event; + this.timestamp = timestamp; + this.version = version; + this.startTimestamp = startTimestamp; + } + + public boolean isFinalState() { + return state.isFinal(); + } + + public boolean isStartState() { + return state.isStart(); + } + + public long getTimestamp() { + return timestamp; + } + + public long getStartTimestamp() { + return startTimestamp; + } + + public State<T> getState() { + return state; + } + + public T getEvent() { + return event; + } + + public DeweyNumber getVersion() { + return version; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java new file mode 100644 index 0000000..bb9039d --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Versioning scheme which allows to retrieve dependencies between different versions. + * + * A dewey number consists of a sequence of digits d1.d2.d3. ... .dn. A dewey number v is compatible + * to v' iff v contains v' as a prefix or if both dewey number differ only in the last digit and + * the last digit of v is greater than v'. + * + */ +public class DeweyNumber implements Serializable { + + private static final long serialVersionUID = 6170434818252267825L; + + // sequence of digits + private final int[] deweyNumber; + + public DeweyNumber(int start) { + deweyNumber = new int[]{start}; + } + + protected DeweyNumber(int[] deweyNumber) { + this.deweyNumber = deweyNumber; + } + + /** + * Checks whether this dewey number is compatible to the other dewey number. + * + * True iff this contains other as a prefix or iff they differ only in the last digit whereas + * the last digit of this is greater than the last digit of other. + * + * @param other The other dewey number to check compatibility against + * @return Whether this dewey number is compatible to the other dewey number + */ + public boolean isCompatibleWith(DeweyNumber other) { + if (length() > other.length()) { + // prefix case + for (int i = 0; i < other.length(); i++) { + if (other.deweyNumber[i] != deweyNumber[i]) { + return false; + } + } + + return true; + } else if (length() == other.length()) { + // check init digits for equality + int lastIndex = length() - 1; + for (int i = 0; i < lastIndex; i++) { + if (other.deweyNumber[i] != deweyNumber[i]) { + return false; + } + } + + // check that the last digit is greater or equal + return deweyNumber[lastIndex] >= other.deweyNumber[lastIndex]; + } else { + return false; + } + } + + public int length() { + return deweyNumber.length; + } + + /** + * Creates a new dewey number from this such that its last digit is increased by + * one. + * + * @return A new dewey number derived from this whose last digit is increased by one + */ + public DeweyNumber increase() { + int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length); + newDeweyNumber[deweyNumber.length - 1]++; + + return new DeweyNumber(newDeweyNumber); + } + + /** + * Creates a new dewey number from this such that a 0 is appended as new last digit. + * + * @return A new dewey number which contains this as a prefix and has 0 as last digit + */ + public DeweyNumber addStage() { + int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length + 1); + + return new DeweyNumber(newDeweyNumber); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DeweyNumber) { + DeweyNumber other = (DeweyNumber) obj; + + return Arrays.equals(deweyNumber, other.deweyNumber); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(deweyNumber); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + for (int i = 0; i < length() - 1; i++) { + builder.append(deweyNumber[i]).append("."); + } + + if (length() > 0) { + builder.append(deweyNumber[length() - 1]); + } + + return builder.toString(); + } + + /** + * Creates a dewey number from a string representation. The input string must be a dot separated + * string of integers. + * + * @param deweyNumberString Dot separated string of integers + * @return Dewey number generated from the given input string + */ + public static DeweyNumber fromString(final String deweyNumberString) { + String[] splits = deweyNumberString.split("\\."); + + if (splits.length == 0) { + return new DeweyNumber(Integer.parseInt(deweyNumberString)); + } else { + int[] deweyNumber = new int[splits.length]; + + for (int i = 0; i < splits.length; i++) { + deweyNumber[i] = Integer.parseInt(splits[i]); + } + + return new DeweyNumber(deweyNumber); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java new file mode 100644 index 0000000..a68d6eb --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import com.google.common.collect.LinkedHashMultimap; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cep.NonDuplicatingTypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.Stack; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Non-deterministic finite automaton implementation. + * <p> + * The NFA processes input events which will chnage the internal state machine. Whenever a final + * state is reached, the matching sequence of events is emitted. + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a> + * + * @param <T> Type of the processed events + */ +public class NFA<T> implements Serializable { + + private static final Pattern namePattern = Pattern.compile("^(.*\\[)(\\])$"); + private static final long serialVersionUID = 2957674889294717265L; + + private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer; + + // Buffer used to store the matched events + private final SharedBuffer<State<T>, T> sharedBuffer; + + // Set of all NFA states + private final Set<State<T>> states; + + // Length of the window + private final long windowTime; + + // Current starting index for the next dewey version number + private int startEventCounter; + + // Current set of computation states within the state machine + private transient Queue<ComputationState<T>> computationStates; + + public NFA(final TypeSerializer<T> eventSerializer, final long windowTime) { + this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer); + this.windowTime = windowTime; + sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer); + computationStates = new LinkedList<>(); + + states = new HashSet<>(); + startEventCounter = 1; + } + + public Set<State<T>> getStates() { + return states; + } + + public void addStates(final Collection<State<T>> newStates) { + for (State<T> state: newStates) { + addState(state); + } + } + + public void addState(final State<T> state) { + states.add(state); + + if (state.isStart()) { + computationStates.add(new ComputationState<>(state, null, -1L, null, -1L)); + } + } + + /** + * Processes the next input event. If some of the computations reach a final state then the + * resulting event sequences are returned. + * + * @param event The current event to be processed + * @param timestamp The timestamp of the current event + * @return The collection of matched patterns (e.g. the result of computations which have + * reached a final state) + */ + public Collection<Map<String, T>> process(final T event, final long timestamp) { + final int numberComputationStates = computationStates.size(); + final List<Map<String, T>> result = new ArrayList<>(); + + // iterate over all current computations + for (int i = 0; i < numberComputationStates; i++) { + ComputationState<T> computationState = computationStates.poll(); + + final Collection<ComputationState<T>> newComputationStates; + + if (!computationState.isStartState() && + windowTime > 0 && + timestamp - computationState.getStartTimestamp() > windowTime) { + // remove computation state which has exceeded the window length + sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); + sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); + + newComputationStates = Collections.emptyList(); + } else { + newComputationStates = computeNextStates(computationState, event, timestamp); + } + + for (ComputationState<T> newComputationState: newComputationStates) { + if (newComputationState.isFinalState()) { + // we've reached a final state and can thus retrieve the matching event sequence + Collection<Map<String, T>> matches = extractPatternMatches(newComputationState); + result.addAll(matches); + + // remove found patterns because they are no longer needed + sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp()); + sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp()); + } else { + // add new computation state; it will be processed once the next event arrives + computationStates.add(newComputationState); + } + } + + // prune shared buffer based on window length + if(windowTime > 0) { + long pruningTimestamp = timestamp - windowTime; + + // remove all elements which are expired with respect to the window length + sharedBuffer.prune(pruningTimestamp); + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof NFA) { + @SuppressWarnings("unchecked") + NFA<T> other = (NFA<T>) obj; + + return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) && + sharedBuffer.equals(other.sharedBuffer) && + states.equals(other.states) && + windowTime == other.windowTime && + startEventCounter == other.startEventCounter; + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime, startEventCounter); + } + + /** + * Computes the next computation states based on the given computation state, the current event, + * its timestamp and the internal state machine. + * + * @param computationState Current computation state + * @param event Current event which is processed + * @param timestamp Timestamp of the current event + * @return Collection of computation states which result from the current one + */ + private Collection<ComputationState<T>> computeNextStates( + final ComputationState<T> computationState, + final T event, + final long timestamp) { + Stack<State<T>> states = new Stack<>(); + ArrayList<ComputationState<T>> resultingComputationStates = new ArrayList<>(); + State<T> state = computationState.getState(); + + states.push(state); + + while (!states.isEmpty()) { + State<T> currentState = states.pop(); + Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions(); + + // check all state transitions for each state + for (StateTransition<T> stateTransition: stateTransitions) { + try { + if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) { + // filter condition is true + switch (stateTransition.getAction()) { + case PROCEED: + // simply advance the computation state, but apply the current event to it + // PROCEED is equivalent to an epsilon transition + states.push(stateTransition.getTargetState()); + break; + case IGNORE: + resultingComputationStates.add(computationState); + + // we have a new computation state referring to the same the shared entry + // the lock of the current computation is released later on + sharedBuffer.lock(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); + break; + case TAKE: + final State<T> newState = stateTransition.getTargetState(); + final DeweyNumber newVersion; + final State<T> previousState = computationState.getState(); + final T previousEvent = computationState.getEvent(); + final long previousTimestamp; + final long startTimestamp; + + if (computationState.isStartState()) { + newVersion = new DeweyNumber(startEventCounter++); + startTimestamp = timestamp; + previousTimestamp = -1L; + + } else { + startTimestamp = computationState.getStartTimestamp(); + previousTimestamp = computationState.getTimestamp(); + + if (newState.equals(computationState.getState())) { + newVersion = computationState.getVersion().increase(); + } else { + newVersion = computationState.getVersion().addStage(); + } + } + + sharedBuffer.put( + newState, + event, + timestamp, + previousState, + previousEvent, + previousTimestamp, + newVersion); + + // a new computation state is referring to the shared entry + sharedBuffer.lock(newState, event, timestamp); + + resultingComputationStates.add(new ComputationState<T>( + newState, + event, + timestamp, + newVersion, + startTimestamp)); + break; + } + } + } catch (Exception e) { + throw new RuntimeException("Failure happened in filter function.", e); + } + } + } + + if (computationState.isStartState()) { + // a computation state is always kept if it refers to a starting state because every + // new element can start a new pattern + resultingComputationStates.add(computationState); + } else { + // release the shared entry referenced by the current computation state. + sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); + // try to remove unnecessary shared buffer entries + sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp()); + } + + return resultingComputationStates; + } + + /** + * Extracts all the sequences of events from the start to the given computation state. An event + * sequence is returned as a map which contains the events and the names of the states to which + * the events were mapped. + * + * @param computationState The end computation state of the extracted event sequences + * @return Collection of event sequences which end in the given computation state + */ + private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) { + Collection<LinkedHashMultimap<State<T>, T>> paths = sharedBuffer.extractPatterns( + computationState.getState(), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getVersion()); + + ArrayList<Map<String, T>> result = new ArrayList<>(); + + TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer(); + + // generate the correct names from the collection of LinkedHashMultimaps + for (LinkedHashMultimap<State<T>, T> path: paths) { + Map<String, T> resultPath = new HashMap<>(); + for (State<T> key: path.keySet()) { + int counter = 0; + Set<T> events = path.get(key); + + // we iterate over the elements in insertion order + for (T event: events) { + resultPath.put( + events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(), + // copy the element so that the user can change it + serializer.isImmutableType() ? event : serializer.copy(event) + ); + } + } + + result.add(resultPath); + } + + return result; + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.defaultWriteObject(); + + oos.writeInt(computationStates.size()); + + for(ComputationState<T> computationState: computationStates) { + writeComputationState(computationState, oos); + } + + nonDuplicatingTypeSerializer.clearReferences(); + } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + ois.defaultReadObject(); + + int numberComputationStates = ois.readInt(); + + computationStates = new LinkedList<>(); + + for (int i = 0; i < numberComputationStates; i++) { + ComputationState<T> computationState = readComputationState(ois); + + computationStates.offer(computationState); + } + + nonDuplicatingTypeSerializer.clearReferences(); + } + + private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException { + oos.writeObject(computationState.getState()); + oos.writeLong(computationState.getTimestamp()); + oos.writeObject(computationState.getVersion()); + oos.writeLong(computationState.getStartTimestamp()); + + DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(oos); + + nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output); + } + + @SuppressWarnings("unchecked") + private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException { + final State<T> state = (State<T>)ois.readObject(); + final long timestamp = ois.readLong(); + final DeweyNumber version = (DeweyNumber)ois.readObject(); + final long startTimestamp = ois.readLong(); + + DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois); + final T event = nonDuplicatingTypeSerializer.deserialize(input); + + return new ComputationState<>(state, event, timestamp, version, startTimestamp); + } + + /** + * Generates a state name from a given name template and an index. + * <p> + * If the template ends with "[]" the index is inserted in between the square brackets. + * Otherwise, an underscore and the index is appended to the name. + * + * @param name Name template + * @param index Index of the state + * @return Generated state name from the given state name template + */ + static String generateStateName(final String name, final int index) { + Matcher matcher = namePattern.matcher(name); + + if (matcher.matches()) { + return matcher.group(1) + index + matcher.group(2); + } else { + return name + "_" + index; + } + } +}