http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java new file mode 100644 index 0000000..e1c0099 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -0,0 +1,858 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOVICE file + * distributed with this work for additional information + * regarding copyright ownership. Vhe ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import com.google.common.collect.LinkedHashMultimap; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Stack; + +/** + * A shared buffer implementation which stores values under a key. Additionally, the values can be + * versioned such that it is possible to retrieve their predecessor element in the buffer. + * <p> + * The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each + * buffer page maintains a collection of the inserted values. + * + * The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store + * relations between different entries. A dewey versioning scheme allows to discriminate between + * different relations (e.g. preceding element). + * + * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams". + * + * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a> + * + * @param <K> Type of the keys + * @param <V> Type of the values + */ +public class SharedBuffer<K extends Serializable, V> implements Serializable { + private static final long serialVersionUID = 9213251042562206495L; + + private final TypeSerializer<V> valueSerializer; + + private transient Map<K, SharedBufferPage<K, V>> pages; + + public SharedBuffer(final TypeSerializer<V> valueSerializer) { + this.valueSerializer = valueSerializer; + pages = new HashMap<>(); + } + + /** + * Stores given value (value + timestamp) under the given key. It assigns a preceding element + * relation to the entry which is defined by the previous key, value (value + timestamp). + * + * @param key Key of the current value + * @param value Current value + * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable)) + * @param previousKey Key of the value for the previous relation + * @param previousValue Value for the previous relation + * @param previousTimestamp Timestamp of the value for the previous relation + * @param version Version of the previous relation + */ + public void put( + final K key, + final V value, + final long timestamp, + final K previousKey, + final V previousValue, + final long previousTimestamp, + final DeweyNumber version) { + SharedBufferPage<K, V> page; + + if (!pages.containsKey(key)) { + page = new SharedBufferPage<K, V>(key); + pages.put(key, page); + } else { + page = pages.get(key); + } + + final SharedBufferEntry<K, V> previousSharedBufferEntry = get(previousKey, previousValue, previousTimestamp); + + page.add( + new ValueTimeWrapper<>(value, timestamp), + previousSharedBufferEntry, + version); + } + + /** + * Checks whether the given key, value, timestamp triple is contained in the shared buffer + * + * @param key Key of the value + * @param value Value + * @param timestamp Timestamp of the value + * @return Whether a value with the given timestamp is registered under the given key + */ + public boolean contains( + final K key, + final V value, + final long timestamp) { + + return pages.containsKey(key) && pages.get(key).contains(new ValueTimeWrapper<>(value, timestamp)); + } + + public boolean isEmpty() { + for (SharedBufferPage<K, V> page: pages.values()) { + if (!page.isEmpty()) { + return false; + } + } + return true; + } + + /** + * Deletes all entries in each page which have expired with respect to given pruning timestamp. + * + * @param pruningTimestamp The time which is used for pruning. All elements whose timestamp is + * lower than the pruning timestamp will be removed. + */ + public void prune(long pruningTimestamp) { + Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator(); + + while (iter.hasNext()) { + SharedBufferPage<K, V> page = iter.next().getValue(); + + page.prune(pruningTimestamp); + + if (page.isEmpty()) { + // delete page if it is empty + iter.remove(); + } + } + } + + /** + * Returns all elements from the previous relation starting at the given value with the + * given key and timestamp. + * + * @param key Key of the starting value + * @param value Value of the starting element + * @param timestamp Timestamp of the starting value + * @param version Version of the previous relation which shall be extracted + * @return Collection of previous relations starting with the given value + */ + public Collection<LinkedHashMultimap<K, V>> extractPatterns( + final K key, + final V value, + final long timestamp, + final DeweyNumber version) { + Collection<LinkedHashMultimap<K, V>> result = new ArrayList<>(); + + // stack to remember the current extraction states + Stack<ExtractionState<K, V>> extractionStates = new Stack<>(); + + // get the starting shared buffer entry for the previous relation + SharedBufferEntry<K, V> entry = get(key, value, timestamp); + + if (entry != null) { + extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>())); + + // use a depth first search to reconstruct the previous relations + while (!extractionStates.isEmpty()) { + ExtractionState<K, V> extractionState = extractionStates.pop(); + DeweyNumber currentVersion = extractionState.getVersion(); + // current path of the depth first search + Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath(); + + // termination criterion + if (currentVersion.length() == 1) { + LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create(); + + while(!currentPath.isEmpty()) { + SharedBufferEntry<K, V> currentEntry = currentPath.pop(); + + completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue()); + } + + result.add(completePath); + } else { + SharedBufferEntry<K, V> currentEntry = extractionState.getEntry(); + + // append state to the path + currentPath.push(currentEntry); + + boolean firstMatch = true; + for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) { + // we can only proceed if the current version is compatible to the version + // of this previous relation + if (currentVersion.isCompatibleWith(edge.getVersion())) { + if (firstMatch) { + // for the first match we don't have to copy the current path + extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath)); + firstMatch = false; + } else { + Stack<SharedBufferEntry<K, V>> copy = new Stack<>(); + copy.addAll(currentPath); + + extractionStates.push( + new ExtractionState<K, V>( + edge.getTarget(), + edge.getVersion(), + copy)); + } + } + } + } + } + } + + return result; + } + + /** + * Increases the reference counter for the given value, key, timestamp entry so that it is not + * accidentally removed. + * + * @param key Key of the value to lock + * @param value Value to lock + * @param timestamp Timestamp of the value to lock + */ + public void lock(final K key, final V value, final long timestamp) { + SharedBufferEntry<K, V> entry = get(key, value, timestamp); + + if (entry != null) { + entry.increaseReferenceCounter(); + } + } + + /** + * Decreases the reference counter for the given value, key, timstamp entry so that it can be + * removed once the reference counter reaches 0. + * + * @param key Key of the value to release + * @param value Value to release + * @param timestamp Timestamp of the value to release + */ + public void release(final K key, final V value, final long timestamp) { + SharedBufferEntry<K, V> entry = get(key, value, timestamp); + + if (entry != null ) { + entry.decreaseReferenceCounter(); + } + } + + /** + * Removes the given value, key, timestamp entry if its reference counter is 0. It will also + * release the next element in its previous relation and apply remove to this element + * recursively. + * + * @param key Key of the value to remove + * @param value Value to remove + * @param timestamp Timestamp of the value to remvoe + */ + public void remove(final K key, final V value, final long timestamp) { + SharedBufferEntry<K, V> entry = get(key, value, timestamp); + + if (entry != null) { + internalRemove(entry); + } + } + + private void writeObject(ObjectOutputStream oos) throws IOException { + DataOutputViewStreamWrapper target = new DataOutputViewStreamWrapper(oos); + Map<SharedBufferEntry<K, V>, Integer> entryIDs = new HashMap<>(); + int totalEdges = 0; + int entryCounter = 0; + + oos.defaultWriteObject(); + + // number of pages + oos.writeInt(pages.size()); + + for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) { + SharedBufferPage<K, V> page = pageEntry.getValue(); + + // key for the current page + oos.writeObject(page.getKey()); + // number of page entries + oos.writeInt(page.entries.size()); + + for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) { + // serialize the sharedBufferEntry + SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue(); + + // assign id to the sharedBufferEntry for the future serialization of the previous + // relation + entryIDs.put(sharedBuffer, entryCounter++); + + ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime(); + + valueSerializer.serialize(valueTimeWrapper.value, target); + oos.writeLong(valueTimeWrapper.getTimestamp()); + + int edges = sharedBuffer.edges.size(); + totalEdges += edges; + + oos.writeInt(sharedBuffer.referenceCounter); + } + } + + // write the edges between the shared buffer entries + oos.writeInt(totalEdges); + + for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) { + SharedBufferPage<K, V> page = pageEntry.getValue(); + + for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry: page.entries.entrySet()) { + SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue(); + + if (!entryIDs.containsKey(sharedBuffer)) { + throw new RuntimeException("Could not find id for entry: " + sharedBuffer); + } else { + int id = entryIDs.get(sharedBuffer); + + for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) { + // in order to serialize the previous relation we simply serialize the ids + // of the source and target SharedBufferEntry + if (edge.target != null) { + if (!entryIDs.containsKey(edge.getTarget())) { + throw new RuntimeException("Could not find id for entry: " + edge.getTarget()); + } else { + int targetId = entryIDs.get(edge.getTarget()); + + oos.writeInt(id); + oos.writeInt(targetId); + oos.writeObject(edge.version); + } + } else { + oos.writeInt(id); + oos.writeInt(-1); + oos.writeObject(edge.version); + } + } + } + } + } + } + + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(ois); + ArrayList<SharedBufferEntry<K, V>> entryList = new ArrayList<>(); + ois.defaultReadObject(); + + this.pages = new HashMap<>(); + + int numberPages = ois.readInt(); + + for (int i = 0; i < numberPages; i++) { + // key of the page + @SuppressWarnings("unchecked") + K key = (K)ois.readObject(); + + SharedBufferPage<K, V> page = new SharedBufferPage<>(key); + + pages.put(key, page); + + int numberEntries = ois.readInt(); + + for (int j = 0; j < numberEntries; j++) { + // restore the SharedBufferEntries for the given page + V value = valueSerializer.deserialize(source); + long timestamp = ois.readLong(); + + ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp); + SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper, page); + + sharedBufferEntry.referenceCounter = ois.readInt(); + + page.entries.put(valueTimeWrapper, sharedBufferEntry); + + entryList.add(sharedBufferEntry); + } + } + + // read the edges of the shared buffer entries + int numberEdges = ois.readInt(); + + for (int j = 0; j < numberEdges; j++) { + int sourceIndex = ois.readInt(); + int targetIndex = ois.readInt(); + + if (sourceIndex >= entryList.size() || sourceIndex < 0) { + throw new RuntimeException("Could not find source entry with index " + sourceIndex + + ". This indicates a corrupted state."); + } else { + // We've already deserialized the shared buffer entry. Simply read its ID and + // retrieve the buffer entry from the list of entries + SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex); + + final DeweyNumber version = (DeweyNumber) ois.readObject(); + final SharedBufferEntry<K, V> target; + + if (targetIndex >= 0) { + if (targetIndex >= entryList.size()) { + throw new RuntimeException("Could not find target entry with index " + targetIndex + + ". This indicates a corrupted state."); + } else { + target = entryList.get(targetIndex); + } + } else { + target = null; + } + + sourceEntry.edges.add(new SharedBufferEdge<K, V>(target, version)); + } + } + } + + private SharedBufferEntry<K, V> get( + final K key, + final V value, + final long timestamp) { + if (pages.containsKey(key)) { + return pages + .get(key) + .get(new ValueTimeWrapper<V>(value, timestamp)); + } else { + return null; + } + } + + private void internalRemove(final SharedBufferEntry<K, V> entry) { + Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>(); + entriesToRemove.add(entry); + + while (!entriesToRemove.isEmpty()) { + SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop(); + + if (currentEntry.getReferenceCounter() == 0) { + currentEntry.remove(); + + for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) { + if (edge.getTarget() != null) { + edge.getTarget().decreaseReferenceCounter(); + entriesToRemove.push(edge.getTarget()); + } + } + } + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + for(Map.Entry<K, SharedBufferPage<K, V>> entry :pages.entrySet()){ + builder.append("Key: ").append(entry.getKey()).append("\n"); + builder.append("Value: ").append(entry.getValue()).append("\n"); + } + + return builder.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SharedBuffer) { + @SuppressWarnings("unchecked") + SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj; + + return pages.equals(other.pages) && valueSerializer.equals(other.valueSerializer); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(pages, valueSerializer); + } + + /** + * The SharedBufferPage represents a set of elements which have been stored under the same key. + * + * @param <K> Type of the key + * @param <V> Type of the value + */ + private static class SharedBufferPage<K, V> { + + // key of the page + private final K key; + + // Map of entries which are stored in this page + private final HashMap<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entries; + + public SharedBufferPage(final K key) { + this.key = key; + entries = new HashMap<>(); + } + + public K getKey() { + return key; + } + + /** + * Adds a new value time pair to the page. The new entry is linked to the previous entry + * with the given version. + * + * @param valueTime Value time pair to be stored + * @param previous Previous shared buffer entry to which the new entry shall be linked + * @param version Version of the relation between the new and the previous entry + */ + public void add(final ValueTimeWrapper<V> valueTime, final SharedBufferEntry<K, V> previous, final DeweyNumber version) { + SharedBufferEntry<K, V> sharedBufferEntry = entries.get(valueTime); + + if (sharedBufferEntry == null) { + sharedBufferEntry = new SharedBufferEntry<K, V>(valueTime, this); + + entries.put(valueTime, sharedBufferEntry); + } + + SharedBufferEdge<K, V> newEdge; + + if (previous != null) { + newEdge = new SharedBufferEdge<>(previous, version); + previous.increaseReferenceCounter(); + } else { + newEdge = new SharedBufferEdge<>(null, version); + } + + sharedBufferEntry.addEdge(newEdge); + } + + public boolean contains(final ValueTimeWrapper<V> valueTime) { + return entries.containsKey(valueTime); + } + + public SharedBufferEntry<K, V> get(final ValueTimeWrapper<V> valueTime) { + return entries.get(valueTime); + } + + /** + * Removes all entries from the map whose timestamp is smaller than the pruning timestamp. + * + * @param pruningTimestamp Timestamp for the pruning + */ + public void prune(long pruningTimestamp) { + Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator(); + boolean continuePruning = true; + + while (iterator.hasNext() && continuePruning) { + SharedBufferEntry<K, V> entry = iterator.next().getValue(); + + if (entry.getValueTime().getTimestamp() <= pruningTimestamp) { + iterator.remove(); + } else { + continuePruning = false; + } + } + } + + public boolean isEmpty() { + return entries.isEmpty(); + } + + public SharedBufferEntry<K, V> remove(final ValueTimeWrapper<V> valueTime) { + return entries.remove(valueTime); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + builder.append("SharedBufferPage(\n"); + + for (SharedBufferEntry<K, V> entry: entries.values()) { + builder.append(entry.toString()).append("\n"); + } + + builder.append(")"); + + return builder.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SharedBufferPage) { + @SuppressWarnings("unchecked") + SharedBufferPage<K, V> other = (SharedBufferPage<K, V>) obj; + + return key.equals(other.key) && entries.equals(other.entries); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(key, entries); + } + } + + /** + * Entry of a {@link SharedBufferPage}. The entry contains the value timestamp pair, a set of + * edges to other shared buffer entries denoting a relation, a reference to the owning page and + * a reference counter. The reference counter counts how many references are kept to this entry. + * + * @param <K> Type of the key + * @param <V> Type of the value + */ + private static class SharedBufferEntry<K, V> { + private final ValueTimeWrapper<V> valueTime; + private final Set<SharedBufferEdge<K, V>> edges; + private final SharedBufferPage<K, V> page; + private int referenceCounter; + + public SharedBufferEntry( + final ValueTimeWrapper<V> valueTime, + final SharedBufferPage<K, V> page) { + this(valueTime, null, page); + } + + public SharedBufferEntry( + final ValueTimeWrapper<V> valueTime, + final SharedBufferEdge<K, V> edge, + final SharedBufferPage<K, V> page) { + this.valueTime = valueTime; + edges = new HashSet<>(); + + if (edge != null) { + edges.add(edge); + } + + referenceCounter = 0; + + this.page = page; + } + + public ValueTimeWrapper<V> getValueTime() { + return valueTime; + } + + public Collection<SharedBufferEdge<K, V>> getEdges() { + return edges; + } + + public K getKey() { + return page.getKey(); + } + + public void addEdge(SharedBufferEdge<K, V> edge) { + edges.add(edge); + } + + public boolean remove() { + if (page != null) { + page.remove(valueTime); + + return true; + } else { + return false; + } + } + + public void increaseReferenceCounter() { + referenceCounter++; + } + + public void decreaseReferenceCounter() { + if (referenceCounter > 0) { + referenceCounter--; + } + } + + public int getReferenceCounter() { + return referenceCounter; + } + + @Override + public String toString() { + return "SharedBufferEntry(" + valueTime + ", [" + StringUtils.join(edges, ", ") + "], " + referenceCounter + ")"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SharedBufferEntry) { + @SuppressWarnings("unchecked") + SharedBufferEntry<K, V> other = (SharedBufferEntry<K, V>) obj; + + return valueTime.equals(other.valueTime) && + getKey().equals(other.getKey()) && + referenceCounter == other.referenceCounter && + edges.equals(other.edges); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(valueTime, getKey(), referenceCounter, edges); + } + } + + /** + * Versioned edge between two shared buffer entries + * + * @param <K> Type of the key + * @param <V> Type of the value + */ + public static class SharedBufferEdge<K, V> { + private final SharedBufferEntry<K, V> target; + private final DeweyNumber version; + + public SharedBufferEdge(final SharedBufferEntry<K, V> target, final DeweyNumber version) { + this.target = target; + this.version = version; + } + + public SharedBufferEntry<K, V> getTarget() { + return target; + } + + public DeweyNumber getVersion() { + return version; + } + + @Override + public String toString() { + return "SharedBufferEdge(" + target + ", " + version + ")"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SharedBufferEdge) { + @SuppressWarnings("unchecked") + SharedBufferEdge<K, V> other = (SharedBufferEdge<K, V>) obj; + + if (version.equals(other.version)) { + if (target == null && other.target == null) { + return true; + } else if (target != null && other.target != null) { + return target.getKey().equals(other.target.getKey()) && + target.getValueTime().equals(other.target.getValueTime()); + } else { + return false; + } + } else { + return false; + } + } else { + return false; + } + } + + @Override + public int hashCode() { + if (target != null) { + return Objects.hash(target.getKey(), target.getValueTime(), version); + } else { + return version.hashCode(); + } + } + } + + /** + * Wrapper for a value timestamp pair. + * + * @param <V> Type of the value + */ + static class ValueTimeWrapper<V> { + private final V value; + private final long timestamp; + + public ValueTimeWrapper(final V value, final long timestamp) { + this.value = value; + this.timestamp = timestamp; + } + + public V getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return "ValueTimeWrapper(" + value + ", " + timestamp + ")"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ValueTimeWrapper) { + @SuppressWarnings("unchecked") + ValueTimeWrapper<V> other = (ValueTimeWrapper<V>)obj; + + return timestamp == other.getTimestamp() && value.equals(other.getValue()); + } else { + return false; + } + } + + @Override + public int hashCode() { + return (int) (this.timestamp ^ this.timestamp >>> 32) + 31 * value.hashCode(); + } + } + + /** + * Helper class to store the extraction state while extracting a sequence of values following + * the versioned entry edges. + * + * @param <K> Type of the key + * @param <V> Type of the value + */ + private static class ExtractionState<K, V> { + private final SharedBufferEntry<K, V> entry; + private final DeweyNumber version; + private final Stack<SharedBufferEntry<K, V>> path; + + public ExtractionState( + final SharedBufferEntry<K, V> entry, + final DeweyNumber version, + final Stack<SharedBufferEntry<K, V>> path) { + + this.entry = entry; + this.version = version; + this.path = path; + } + + public SharedBufferEntry<K, V> getEntry() { + return entry; + } + + public DeweyNumber getVersion() { + return version; + } + + public Stack<SharedBufferEntry<K, V>> getPath() { + return path; + } + + @Override + public String toString() { + return "ExtractionState(" + entry + ", " + version + ", [" + StringUtils.join(path, ", ") + "])"; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java new file mode 100644 index 0000000..50b2cf3 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Objects; + +/** + * Represents a state of the {@link NFA}. + * <p> + * Each state is identified by a name and a state type. Furthermore, it contains a collection of + * state transitions. The state transitions describe under which conditions it is possible to enter + * a new state. + * + * @param <T> Type of the input events + */ +public class State<T> implements Serializable { + private static final long serialVersionUID = 6658700025989097781L; + + private final String name; + private final StateType stateType; + private final Collection<StateTransition<T>> stateTransitions; + + public State(final String name, final StateType stateType) { + this.name = name; + this.stateType = stateType; + + stateTransitions = new ArrayList<StateTransition<T>>(); + } + + public boolean isFinal() { + return stateType == StateType.Final; + } + + public boolean isStart() { return stateType == StateType.Start; } + + public String getName() { + return name; + } + + public Collection<StateTransition<T>> getStateTransitions() { + return stateTransitions; + } + + public void addStateTransition(final StateTransition<T> stateTransition) { + stateTransitions.add(stateTransition); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof State) { + @SuppressWarnings("unchecked") + State<T> other = (State<T>)obj; + + return name.equals(other.name) && + stateType == other.stateType && + stateTransitions.equals(other.stateTransitions); + } else { + return false; + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + builder.append("State(").append(name).append(", ").append(stateType).append(", [\n"); + + for (StateTransition<T> stateTransition: stateTransitions) { + builder.append(stateTransition).append(",\n"); + } + + builder.append("])"); + + return builder.toString(); + } + + @Override + public int hashCode() { + return Objects.hash(name, stateType, stateTransitions); + } + + /** + * Set of valid state types. + */ + public enum StateType { + Start, // the state is a starting state for the NFA + Final, // the state is a final state for the NFA + Normal // the state is neither a start nor a final state + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java new file mode 100644 index 0000000..479f28a --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +import org.apache.flink.api.common.functions.FilterFunction; + +import java.io.Serializable; +import java.util.Objects; + +public class StateTransition<T> implements Serializable { + private static final long serialVersionUID = -4825345749997891838L; + + private final StateTransitionAction action; + private final State<T> targetState; + private final FilterFunction<T> condition; + + public StateTransition(final StateTransitionAction action, final State<T> targetState, final FilterFunction<T> condition) { + this.action = action; + this.targetState = targetState; + this.condition = condition; + } + + public StateTransitionAction getAction() { + return action; + } + + public State<T> getTargetState() { + return targetState; + } + + public FilterFunction<T> getCondition() { + return condition; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof StateTransition) { + @SuppressWarnings("unchecked") + StateTransition<T> other = (StateTransition<T>) obj; + + return action == other.action && + targetState.getName().equals(other.targetState.getName()); + } else { + return false; + } + } + + @Override + public int hashCode() { + // we have to take the name of targetState because the transition might be reflexive + return Objects.hash(action, targetState.getName()); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + builder.append("StateTransition(").append(action).append(", ").append(targetState.getName()); + + if (condition != null) { + builder.append(", with filter)"); + } else { + builder.append(")"); + } + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java new file mode 100644 index 0000000..70fc7fb --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa; + +/** + * Set of actions when doing a state transition from a {@link State} to another. + */ +public enum StateTransitionAction { + TAKE, // take the current event and assign it to the new state + IGNORE, // ignore the current event and do the state transition + PROCEED // do the state transition and keep the current event for further processing (epsilon transition) +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java new file mode 100644 index 0000000..f2561d4 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.compiler; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.State; +import org.apache.flink.cep.nfa.StateTransition; +import org.apache.flink.cep.nfa.StateTransitionAction; +import org.apache.flink.cep.pattern.FollowedByPattern; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.windowing.time.Time; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +/** + * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a + * {@link NFAFactory}. + */ +public class NFACompiler { + + protected final static String BEGINNING_STATE_NAME = "$beginningState$"; + + /** + * Compiles the given pattern into a {@link NFA}. + * + * @param pattern Definition of sequence pattern + * @param inputTypeSerializer Serializer for the input type + * @param <T> Type of the input events + * @return Non-deterministic finite automaton representing the given pattern + */ + public static <T> NFA<T> compile(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) { + NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer); + + return factory.createNFA(); + } + + /** + * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create + * multiple NFAs. + * + * @param pattern Definition of sequence pattern + * @param inputTypeSerializer Serializer for the input type + * @param <T> Type of the input events + * @return Factory for NFAs corresponding to the given pattern + */ + @SuppressWarnings("unchecked") + public static <T> NFAFactory<T> compileFactory(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) { + if (pattern == null) { + // return a factory for empty NFAs + return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList()); + } else { + // set of all generated states + Map<String, State<T>> states = new HashMap<>(); + long windowTime; + + Pattern<T, ?> succeedingPattern; + State<T> succeedingState; + Pattern<T, ?> currentPattern = pattern; + + // we're traversing the pattern from the end to the beginning --> the first state is the final state + State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final); + + states.put(currentPattern.getName(), currentState); + + windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L; + + while (currentPattern.getPrevious() != null) { + succeedingPattern = currentPattern; + succeedingState = currentState; + currentPattern = currentPattern.getPrevious(); + + Time currentWindowTime = currentPattern.getWindowTime(); + + if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) { + // the window time is the global minimum of all window times of each state + windowTime = currentWindowTime.toMilliseconds(); + } + + if (states.containsKey(currentPattern.getName())) { + currentState = states.get(currentPattern.getName()); + } else { + currentState = new State<>(currentPattern.getName(), State.StateType.Normal); + states.put(currentState.getName(), currentState); + } + + currentState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + succeedingState, + (FilterFunction<T>) succeedingPattern.getFilterFunction())); + + if (succeedingPattern instanceof FollowedByPattern) { + // the followed by pattern entails a reflexive ignore transition + currentState.addStateTransition(new StateTransition<T>( + StateTransitionAction.IGNORE, + currentState, + null + )); + } + } + + // add the beginning state + final State<T> beginningState; + + if (states.containsKey(BEGINNING_STATE_NAME)) { + beginningState = states.get(BEGINNING_STATE_NAME); + } else { + beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start); + states.put(BEGINNING_STATE_NAME, beginningState); + } + + beginningState.addStateTransition(new StateTransition<T>( + StateTransitionAction.TAKE, + currentState, + (FilterFunction<T>) currentPattern.getFilterFunction() + )); + + NFA<T> nfa = new NFA<T>(inputTypeSerializer, windowTime); + nfa.addStates(states.values()); + + return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values())); + } + } + + /** + * Factory interface for {@link NFA}. + * + * @param <T> Type of the input events which are processed by the NFA + */ + public interface NFAFactory<T> extends Serializable { + NFA<T> createNFA(); + } + + /** + * Implementation of the {@link NFAFactory} interface. + * <p> + * The implementation takes the input type serializer, the window time and the set of + * states and their transitions to be able to create an NFA from them. + * + * @param <T> Type of the input events which are processed by the NFA + */ + private static class NFAFactoryImpl<T> implements NFAFactory<T> { + + private static final long serialVersionUID = 8939783698296714379L; + + private final TypeSerializer<T> inputTypeSerializer; + private final long windowTime; + private final Collection<State<T>> states; + + private NFAFactoryImpl(TypeSerializer<T> inputTypeSerializer, long windowTime, Collection<State<T>> states) { + this.inputTypeSerializer = inputTypeSerializer; + this.windowTime = windowTime; + this.states = states; + } + + @Override + public NFA<T> createNFA() { + NFA<T> result = new NFA<>(inputTypeSerializer.duplicate(), windowTime); + + result.addStates(states); + + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java new file mode 100644 index 0000000..a943f0d --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.PriorityQueue; + +/** + * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event + * patterns. The detected event patterns are then outputted to the down stream operators. + * + * @param <IN> Type of the input elements + */ +public abstract class AbstractCEPPatternOperator<IN> + extends AbstractStreamOperator<Map<String, IN>> + implements OneInputStreamOperator<IN, Map<String, IN>> { + + private static final long serialVersionUID = -4166778210774160757L; + + protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; + + private final TypeSerializer<IN> inputSerializer; + private final boolean isProcessingTime; + + public AbstractCEPPatternOperator( + final TypeSerializer<IN> inputSerializer, + final boolean isProcessingTime) { + this.inputSerializer = inputSerializer; + this.isProcessingTime = isProcessingTime; + } + + public TypeSerializer<IN> getInputSerializer() { + return inputSerializer; + } + + protected abstract NFA<IN> getNFA() throws IOException; + + protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException; + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + if (isProcessingTime) { + // there can be no out of order elements in processing time + NFA<IN> nfa = getNFA(); + processEvent(nfa, element.getValue(), element.getTimestamp()); + } else { + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + + // event time processing + // we have to buffer the elements until we receive the proper watermark + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + } + } + + /** + * Process the given event by giving it to the NFA and outputting the produced set of matched + * event sequences. + * + * @param nfa NFA to be used for the event detection + * @param event The current event to be processed + * @param timestamp The timestamp of the event + */ + protected void processEvent(NFA<IN> nfa, IN event, long timestamp) { + Collection<Map<String, IN>> patterns = nfa.process( + event, + timestamp); + + if (!patterns.isEmpty()) { + StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>( + null, + timestamp); + + for (Map<String, IN> pattern : patterns) { + streamRecord.replace(pattern); + output.collect(streamRecord); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java new file mode 100644 index 0000000..2ad152e --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.PriorityQueue; + +/** + * CEP pattern operator implementation which is used for non keyed streams. Consequently, + * the operator state only includes a single {@link NFA} and a priority queue to order out of order + * elements in case of event time processing. + * + * @param <IN> Type of the input elements + */ +public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN> { + private static final long serialVersionUID = 7487334510746595640L; + + private final StreamRecordSerializer<IN> streamRecordSerializer; + + // global nfa for all elements + private NFA<IN> nfa; + + // queue to buffer out of order stream records + private transient PriorityQueue<StreamRecord<IN>> priorityQueue; + + public CEPPatternOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + NFACompiler.NFAFactory<IN> nfaFactory) { + super(inputSerializer, isProcessingTime); + + this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer); + this.nfa = nfaFactory.createNFA(); + } + + @Override + public void open() { + if (priorityQueue == null) { + priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>()); + } + } + + @Override + protected NFA<IN> getNFA() throws IOException { + return nfa; + } + + @Override + protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { + return priorityQueue; + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { + StreamRecord<IN> streamRecord = priorityQueue.poll(); + + processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); + } + } + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + + final StateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream( + checkpointId, + timestamp); + + final ObjectOutputStream oos = new ObjectOutputStream(os); + final StateBackend.CheckpointStateOutputView ov = new StateBackend.CheckpointStateOutputView(os); + + oos.writeObject(nfa); + + ov.writeInt(priorityQueue.size()); + + for (StreamRecord<IN> streamRecord: priorityQueue) { + streamRecordSerializer.serialize(streamRecord, ov); + } + + taskState.setOperatorState(os.closeAndGetHandle()); + + return taskState; + } + + @Override + @SuppressWarnings("unchecked") + public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { + super.restoreState(state, recoveryTimestamp); + + StreamStateHandle stream = (StreamStateHandle)state.getOperatorState(); + + final InputStream is = stream.getState(getUserCodeClassloader()); + final ObjectInputStream ois = new ObjectInputStream(is); + final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is); + + nfa = (NFA<IN>)ois.readObject(); + + int numberPriorityQueueEntries = div.readInt(); + + priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>()); + + for (int i = 0; i <numberPriorityQueueEntries; i++) { + priorityQueue.offer(streamRecordSerializer.deserialize(div)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java new file mode 100644 index 0000000..03758c7 --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.runtime.tasks.StreamTaskState; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * CEP pattern operator implementation for a keyed input stream. For each key, the operator creates + * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are + * stored using the key value state. Additionally, the set of all seen keys is kept as part of the + * operator state. This is necessary to trigger the execution for all keys upon receiving a new + * watermark. + * + * @param <IN> Type of the input elements + * @param <KEY> Type of the key on which the input stream is keyed + */ +public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator<IN> { + private static final long serialVersionUID = -7234999752950159178L; + + private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState"; + private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName"; + + // necessary to extract the key from the input elements + private final KeySelector<IN, KEY> keySelector; + + // necessary to serialize the set of seen keys + private final TypeSerializer<KEY> keySerializer; + + private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>(); + private final NFACompiler.NFAFactory<IN> nfaFactory; + + // stores the keys we've already seen to trigger execution upon receiving a watermark + // this can be problematic, since it is never cleared + // TODO: fix once the state refactoring is completed + private transient Set<KEY> keys; + + private transient OperatorState<NFA<IN>> nfaOperatorState; + private transient OperatorState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState; + + public KeyedCEPPatternOperator( + TypeSerializer<IN> inputSerializer, + boolean isProcessingTime, + KeySelector<IN, KEY> keySelector, + TypeSerializer<KEY> keySerializer, + NFACompiler.NFAFactory<IN> nfaFactory) { + super(inputSerializer, isProcessingTime); + + this.keySelector = keySelector; + this.keySerializer = keySerializer; + + this.nfaFactory = nfaFactory; + } + + @Override + @SuppressWarnings("unchecked") + public void open() throws Exception { + if (keys == null) { + keys = new HashSet<>(); + } + + if (nfaOperatorState == null) { + nfaOperatorState = this.createKeyValueState( + NFA_OPERATOR_STATE_NAME, + new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()), + null); + } + + if (priorityQueueOperatorState == null) { + priorityQueueOperatorState = this.createKeyValueState( + PRIORIRY_QUEUE_STATE_NAME, + new PriorityQueueSerializer<StreamRecord<IN>>( + new StreamRecordSerializer<IN>(getInputSerializer()), + new PriorityQueueStreamRecordFactory<IN>()), + null); + } + } + + @Override + protected NFA<IN> getNFA() throws IOException { + NFA<IN> nfa = nfaOperatorState.value(); + + if (nfa == null) { + nfa = nfaFactory.createNFA(); + + nfaOperatorState.update(nfa); + } + + return nfa; + } + + @Override + protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException { + PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value(); + + if (priorityQueue == null) { + priorityQueue = priorityQueueFactory.createPriorityQueue(); + + priorityQueueOperatorState.update(priorityQueue); + } + + return priorityQueue; + } + + @Override + public void processElement(StreamRecord<IN> element) throws Exception { + keys.add(keySelector.getKey(element.getValue())); + + super.processElement(element); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + // iterate over all keys to trigger the execution of the buffered elements + for (KEY key: keys) { + setKeyContext(key); + + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + + NFA<IN> nfa = getNFA(); + + while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) { + StreamRecord<IN> streamRecord = priorityQueue.poll(); + + processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp()); + } + } + } + + @Override + public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + + StateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); + + ov.writeInt(keys.size()); + + for (KEY key: keys) { + keySerializer.serialize(key, ov); + } + + taskState.setOperatorState(ov.closeAndGetHandle()); + + return taskState; + } + + @Override + public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception { + super.restoreState(state, recoveryTimestamp); + + @SuppressWarnings("unchecked") + StateHandle<DataInputView> stateHandle = (StateHandle<DataInputView>) state; + + DataInputView inputView = stateHandle.getState(getUserCodeClassloader()); + + if (keys == null) { + keys = new HashSet<>(); + } + + int numberEntries = inputView.readInt(); + + for (int i = 0; i <numberEntries; i++) { + keys.add(keySerializer.deserialize(inputView)); + } + } + + /** + * Custom type serializer implementation to serialize priority queues. + * + * @param <T> Type of the priority queue's elements + */ + private static class PriorityQueueSerializer<T> extends TypeSerializer<PriorityQueue<T>> { + + private static final long serialVersionUID = -231980397616187715L; + + private final TypeSerializer<T> elementSerializer; + private final PriorityQueueFactory<T> factory; + + public PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) { + this.elementSerializer = elementSerializer; + this.factory = factory; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<PriorityQueue<T>> duplicate() { + return new PriorityQueueSerializer<>(elementSerializer.duplicate(), factory); + } + + @Override + public PriorityQueue<T> createInstance() { + return factory.createPriorityQueue(); + } + + @Override + public PriorityQueue<T> copy(PriorityQueue<T> from) { + PriorityQueue<T> result = factory.createPriorityQueue(); + + for (T element: from) { + result.offer(elementSerializer.copy(element)); + } + + return result; + } + + @Override + public PriorityQueue<T> copy(PriorityQueue<T> from, PriorityQueue<T> reuse) { + reuse.clear(); + + for (T element: from) { + reuse.offer(elementSerializer.copy(element)); + } + + return reuse; + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(PriorityQueue<T> record, DataOutputView target) throws IOException { + target.writeInt(record.size()); + + for (T element: record) { + elementSerializer.serialize(element, target); + } + } + + @Override + public PriorityQueue<T> deserialize(DataInputView source) throws IOException { + PriorityQueue<T> result = factory.createPriorityQueue(); + + return deserialize(result, source); + } + + @Override + public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, DataInputView source) throws IOException { + reuse.clear(); + + int numberEntries = source.readInt(); + + for (int i = 0; i < numberEntries; i++) { + reuse.offer(elementSerializer.deserialize(source)); + } + + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PriorityQueueSerializer) { + @SuppressWarnings("unchecked") + PriorityQueueSerializer<T> other = (PriorityQueueSerializer<T>) obj; + + return factory.equals(other.factory) && elementSerializer.equals(other.elementSerializer); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof PriorityQueueSerializer; + } + + @Override + public int hashCode() { + return Objects.hash(factory, elementSerializer); + } + } + + private interface PriorityQueueFactory<T> extends Serializable { + PriorityQueue<T> createPriorityQueue(); + } + + private static class PriorityQueueStreamRecordFactory<T> implements PriorityQueueFactory<StreamRecord<T>> { + + private static final long serialVersionUID = 1254766984454616593L; + + @Override + public PriorityQueue<StreamRecord<T>> createPriorityQueue() { + return new PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<T>()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java new file mode 100644 index 0000000..b290e7b --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Compares two {@link StreamRecord}s based on their timestamp + * + * @param <IN> Type of the value field of the StreamRecord + */ +public class StreamRecordComparator<IN> implements Comparator<StreamRecord<IN>>, Serializable { + private static final long serialVersionUID = 1581054988433915305L; + + @Override + public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) { + if (o1.getTimestamp() < o2.getTimestamp()) { + return -1; + } else if (o1.getTimestamp() > o2.getTimestamp()) { + return 1; + } else { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java new file mode 100644 index 0000000..d01643d --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A filter function which combines two filter functions with a logical and. Thus, the filter + * function only returns true, iff both filters return true. + * + * @param <T> Type of the element to filter + */ +public class AndFilterFunction<T> implements FilterFunction<T> { + private static final long serialVersionUID = -2109562093871155005L; + + private final FilterFunction<T> left; + private final FilterFunction<T> right; + + public AndFilterFunction(final FilterFunction<T> left, final FilterFunction<T> right) { + this.left = left; + this.right = right; + } + + @Override + public boolean filter(T value) throws Exception { + return left.filter(value) && right.filter(value); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java new file mode 100644 index 0000000..266451c --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +/** + * Pattern operator which signifies that the there is a non-strict temporal contiguity between + * itself and its preceding pattern operator. This means that there might be events in between + * two matching events. These events are then simply ignored. + * + * @param <T> Base type of the events + * @param <F> Subtype of T to which the operator is currently constrained + */ +public class FollowedByPattern<T, F extends T> extends Pattern<T, F> { + FollowedByPattern(final String name, Pattern<T, ?> previous) { + super(name, previous); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java new file mode 100644 index 0000000..76f660a --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.streaming.api.windowing.time.Time; + +/** + * Base class for a pattern definition. + * <p> + * A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a {@link NFA}. + * + * <pre>{@code + * Pattern<T> pattern = Pattern.<T>begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * </pre> + * + * @param <T> Base type of the elements appearing in the pattern + * @param <F> Subtype of T to which the current pattern operator is constrained + */ +public class Pattern<T, F extends T> { + + // name of the pattern operator + private final String name; + + // previous pattern operator + private final Pattern<T, ?> previous; + + // filter condition for an event to be matched + private FilterFunction<F> filterFunction; + + // window length in which the pattern match has to occur + private Time windowTime; + + protected Pattern(final String name, final Pattern<T, ?> previous) { + this.name = name; + this.previous = previous; + } + + public String getName() { + return name; + } + + public Pattern<T, ?> getPrevious() { + return previous; + } + + public FilterFunction<F> getFilterFunction() { + return filterFunction; + } + + public Time getWindowTime() { + return windowTime; + } + + /** + * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * + * @param newFilterFunction Filter condition + * @return The same pattern operator where the new filter condition is set + */ + public Pattern<T, F> where(FilterFunction<F> newFilterFunction) { + ClosureCleaner.clean(newFilterFunction, true); + + if (this.filterFunction == null) { + this.filterFunction = newFilterFunction; + } else { + this.filterFunction = new AndFilterFunction<F>(this.filterFunction, newFilterFunction); + } + + return this; + } + + /** + * Applies a subtype constraint on the current pattern operator. This means that an event has + * to be of the given subtype in order to be matched. + * + * @param subtypeClass Class of the subtype + * @param <S> Type of the subtype + * @return The same pattern operator with the new subtype constraint + */ + public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) { + if (filterFunction == null) { + this.filterFunction = new SubtypeFilterFunction<F>(subtypeClass); + } else { + this.filterFunction = new AndFilterFunction<F>(this.filterFunction, new SubtypeFilterFunction<F>(subtypeClass)); + } + + @SuppressWarnings("unchecked") + Pattern<T, S> result = (Pattern<T, S>) this; + + return result; + } + + /** + * Defines the maximum time interval for a matching pattern. This means that the time gap + * between first and the last event must not be longer than the window time. + * + * @param windowTime Time of the matching window + * @return The same pattenr operator with the new window length + */ + public Pattern<T, F> within(Time windowTime) { + if (windowTime != null) { + this.windowTime = windowTime; + } + + return this; + } + + /** + * Appends a new pattern operator to the existing one. The new pattern operator enforces strict + * temporal contiguity. This means that the whole pattern only matches if an event which matches + * this operator directly follows the preceding matching event. Thus, there cannot be any + * events in between two matching events. + * + * @param name Name of the new pattern operator + * @return A new pattern operator which is appended to this pattern operator + */ + public Pattern<T, T> next(final String name) { + return new Pattern<T, T>(name, this); + } + + /** + * Appends a new pattern operator to the existing one. The new pattern operator enforces + * non-strict temporal contiguity. This means that a matching event of this operator and the + * preceding matching event might be interleaved with other events which are ignored. + * + * @param name Name of the new pattern operator + * @return A new pattern operator which is appended to this pattern operator + */ + public FollowedByPattern<T, T> followedBy(final String name) { + return new FollowedByPattern<T, T>(name, this); + } + + /** + * Starts a new pattern with the initial pattern operator whose name is provided. Furthermore, + * the base type of the event sequence is set. + * + * @param name Name of the new pattern operator + * @param <X> Base type of the event pattern + * @return The first pattern operator of a pattern + */ + public static <X> Pattern<X, X> begin(final String name) { + return new Pattern<X, X>(name, null); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/79058edb/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java new file mode 100644 index 0000000..f183f0f --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.pattern; + +import org.apache.flink.api.common.functions.FilterFunction; + +/** + * A filter function which filters elements of the given type. A element if filtered out iff it + * is not assignable to the given subtype of T. + * + * @param <T> Type of the elements to be filtered + */ +public class SubtypeFilterFunction<T> implements FilterFunction<T> { + private static final long serialVersionUID = -2990017519957561355L; + + // subtype to filter for + private final Class<? extends T> subtype; + + public SubtypeFilterFunction(final Class<? extends T> subtype) { + this.subtype = subtype; + } + + @Override + public boolean filter(T value) throws Exception { + return subtype.isAssignableFrom(value.getClass()); + } +}