Repository: flink Updated Branches: refs/heads/master 2bd6212a3 -> e29ac036a
[FLINK-3708] [cep] Add Scala CEP API Added missing test dependency to pom. Pattern in Scala API now uses Option to shield users against null values from the Java API Added test-jar for build phase to pom. Removed necessary dependency entry and an overlooked comment. Added missing comments on FollowedByPattern. CEPIT test update. Replace CEP test with a simple test for Scala <-> Java interoperability. This closes #1905. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e29ac036 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e29ac036 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e29ac036 Branch: refs/heads/master Commit: e29ac036a76cc78ea608ffe1f0784ec15e351c60 Parents: 2bd6212 Author: Stefan Richter <stefanrichte...@gmail.com> Authored: Mon Apr 18 13:10:09 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Apr 26 18:47:39 2016 +0200 ---------------------------------------------------------------------- flink-libraries/flink-cep-scala/pom.xml | 114 +++++++++ .../scala/org/apache/flink/cep/scala/CEP.scala | 44 ++++ .../apache/flink/cep/scala/PatternStream.scala | 129 ++++++++++ .../org/apache/flink/cep/scala/package.scala | 46 ++++ .../cep/scala/pattern/FollowedByPattern.scala | 44 ++++ .../flink/cep/scala/pattern/Pattern.scala | 176 +++++++++++++ .../flink/cep/scala/pattern/package.scala | 39 +++ .../cep/scala/CEPScalaAPICompletenessTest.scala | 46 ++++ ...nStreamScalaJavaAPIInteroperabiliyTest.scala | 87 +++++++ .../PatternScalaAPICompletenessTest.scala | 44 ++++ .../flink/cep/scala/pattern/PatternTest.scala | 248 +++++++++++++++++++ flink-libraries/flink-cep/pom.xml | 120 +++++---- .../org/apache/flink/cep/PatternStream.java | 39 ++- .../flink/cep/pattern/AndFilterFunction.java | 8 + .../org/apache/flink/cep/pattern/Pattern.java | 6 +- flink-libraries/pom.xml | 1 + flink-streaming-scala/pom.xml | 13 + .../ScalaAPICompletenessTestBase.scala | 4 + 18 files changed, 1150 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/pom.xml b/flink-libraries/flink-cep-scala/pom.xml new file mode 100644 index 0000000..1b0caee --- /dev/null +++ b/flink-libraries/flink-cep-scala/pom.xml @@ -0,0 +1,114 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-libraries</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <artifactId>flink-cep-scala_2.10</artifactId> + <name>flink-cep-scala</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cep_2.10</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <!-- We need to add this explicitly because through shading the dependency on asm seems + to go away. TODO --> + <dependency> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + <version>${asm.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-cep_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Scala Compiler --> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.2.2</version> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- Scala Code Style, most of the configuration done via plugin management --> + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <configuration> + <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> + </configuration> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala new file mode 100644 index 0000000..f28aee2 --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/CEP.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.cep.{CEP => JCEP} +import org.apache.flink.streaming.api.scala.DataStream + +/** + * Utility method to transform a [[DataStream]] into a [[PatternStream]] to do CEP. + */ + +object CEP { + /** + * Transforms a [[DataStream]] into a [[PatternStream]] in the Scala API. + * See [[org.apache.flink.cep.CEP}]]for a more detailed description how the underlying + * Java API works. + * + * @param input DataStream containing the input events + * @param pattern Pattern specification which shall be detected + * @tparam T Type of the input events + * @return Resulting pattern stream + */ + def pattern[T](input: DataStream[T], + pattern: Pattern[T, _]): PatternStream[T] = { + wrapPatternStream(JCEP.pattern(input.javaStream, pattern.wrappedPattern)) + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala new file mode 100644 index 0000000..22b105c --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.util.{Map => JMap} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction, +PatternStream => JPatternStream} +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.asScalaStream +import org.apache.flink.util.Collector +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected + * pattern sequences as a map of events associated with their names. The pattern is detected using + * a [[org.apache.flink.cep.nfa.NFA]]. In order to process the detected sequences, the user has to + * specify a [[PatternSelectFunction]] or a [[PatternFlatSelectFunction]]. + * + * @param jPatternStream Underlying pattern stream from Java API + * @tparam T Type of the events + */ +class PatternStream[T](jPatternStream: JPatternStream[T]) { + + private[flink] def wrappedPatternStream = jPatternStream + + /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided [[PatternSelectFunction]] is called. The pattern select function can produce + * exactly one resulting element. + * + * @param patternSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @tparam R Type of the resulting elements + * @return [[DataStream]] which contains the resulting elements from the pattern select function. + */ + def select[R: TypeInformation](patternSelectFunction: PatternSelectFunction[T, R]) + : DataStream[R] = { + asScalaStream(jPatternStream.select(patternSelectFunction, implicitly[TypeInformation[R]])) + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence + * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can + * produce an arbitrary number of resulting elements. + * + * @param patternFlatSelectFunction The pattern flat select function which is called for each + * detected pattern sequence. + * @tparam R Type of the resulting elements + * @return [[DataStream]] which contains the resulting elements from the pattern flat select + * function. + */ + def flatSelect[R: TypeInformation](patternFlatSelectFunction: PatternFlatSelectFunction[T, R]) + : DataStream[R] = { + asScalaStream(jPatternStream + .flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]])) + } + + /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided [[PatternSelectFunction]] is called. The pattern select function can produce exactly + * one resulting element. + * + * @param patternSelectFun The pattern select function which is called for each detected + * pattern sequence. + * @tparam R Type of the resulting elements + * @return [[DataStream]] which contains the resulting elements from the pattern select function. + */ + def select[R: TypeInformation](patternSelectFun: mutable.Map[String, T] => R): DataStream[R] = { + val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] { + val cleanFun = cleanClosure(patternSelectFun) + + def select(in: JMap[String, T]): R = cleanFun(in.asScala) + } + select(patternSelectFunction) + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence + * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function + * can produce an arbitrary number of resulting elements. + * + * @param patternFlatSelectFun The pattern flat select function which is called for each + * detected pattern sequence. + * @tparam R Type of the resulting elements + * @return [[DataStream]] which contains the resulting elements from the pattern flat select + * function. + */ + def flatSelect[R: TypeInformation](patternFlatSelectFun: (mutable.Map[String, T], + Collector[R]) => Unit): DataStream[R] = { + val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] = + new PatternFlatSelectFunction[T, R] { + val cleanFun = cleanClosure(patternFlatSelectFun) + + def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit = + cleanFun(pattern.asScala, out) + } + flatSelect(patternFlatSelectFunction) + } + +} + +object PatternStream { + /** + * + * @param jPatternStream Underlying pattern stream from Java API + * @tparam T Type of the events + * @return A new pattern stream wrapping the pattern stream from Java APU + */ + def apply[T](jPatternStream: JPatternStream[T]) = { + new PatternStream[T](jPatternStream) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala new file mode 100644 index 0000000..179174b --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/package.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep + +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.cep.{PatternStream => JPatternStream} + +package object scala { + + /** + * Utility method to wrap [[org.apache.flink.cep.PatternStream]] for usage with the Scala API. + * + * @param javaPatternStream The underlying pattern stream from the Java API + * @tparam T Type of the events + * @return A pattern stream from the Scala API which wraps a pattern stream from the Java API + */ + private[flink] def wrapPatternStream[T](javaPatternStream: JPatternStream[T]) + : scala.PatternStream[T] = { + Option(javaPatternStream) match { + case Some(p) => PatternStream[T](p) + case None => + throw new IllegalArgumentException("PatternStream from Java API must not be null.") + } + } + + private[flink] def cleanClosure[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + return f + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala new file mode 100644 index 0000000..4bda08f --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern} + +object FollowedByPattern { + /** + * Constructs a new Pattern by wrapping a given Java API Pattern + * + * @param jfbPattern Underlying Java API Pattern. + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + * @return New wrapping FollowedByPattern object + */ + def apply[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) = + new FollowedByPattern[T, F](jfbPattern) +} + +/** + * Pattern operator which signifies that the there is a non-strict temporal contiguity between + * itself and its preceding pattern operator. This means that there might be events in between + * two matching events. These events are then simply ignored. + * + * @tparam T Base type of the events + * @tparam F Subtype of T to which the operator is currently constrained + */ +class FollowedByPattern[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) + extends Pattern[T, F](jfbPattern) http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala new file mode 100644 index 0000000..592599c --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +/** + * Base class for a pattern definition. + * + * A pattern definition is used by [[org.apache.flink.cep.nfa.compiler.NFACompiler]] to create + * a [[org.apache.flink.cep.nfa.NFA]]. + * + * {{{ + * Pattern<T, F> pattern = Pattern.<T>begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * }}} + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T , F <: T](jPattern: JPattern[T, F]) { + + private[flink] def wrappedPattern = jPattern + + /** + * + * @return Name of the pattern operator + */ + def getName(): String = jPattern.getName() + + /** + * + * @return Window length in which the pattern match has to occur + */ + def getWindowTime(): Option[Time] = { + Option(jPattern.getWindowTime()) + } + + /** + * + * @return Filter condition for an event to be matched + */ + def getFilterFunction(): Option[FilterFunction[F]] = { + Option(jPattern.getFilterFunction()) + } + + /** + * Applies a subtype constraint on the current pattern operator. This means that an event has + * to be of the given subtype in order to be matched. + * + * @param clazz Class of the subtype + * @tparam S Type of the subtype + * @return The same pattern operator with the new subtype constraint + */ + def subtype[S <: F](clazz: Class[S]): Pattern[T, S] = { + jPattern.subtype(clazz) + this.asInstanceOf[Pattern[T, S]] + } + + /** + * Defines the maximum time interval for a matching pattern. This means that the time gap + * between first and the last event must not be longer than the window time. + * + * @param windowTime Time of the matching window + * @return The same pattern operator with the new window length + */ + def within(windowTime: Time): Pattern[T, F] = { + jPattern.within(windowTime) + this + } + + /** + * Appends a new pattern operator to the existing one. The new pattern operator enforces strict + * temporal contiguity. This means that the whole pattern only matches if an event which matches + * this operator directly follows the preceding matching event. Thus, there cannot be any + * events in between two matching events. + * + * @param name Name of the new pattern operator + * @return A new pattern operator which is appended to this pattern operator + */ + def next(name: String): Pattern[T, T] = { + Pattern[T, T](jPattern.next(name)) + } + + /** + * Appends a new pattern operator to the existing one. The new pattern operator enforces + * non-strict temporal contiguity. This means that a matching event of this operator and the + * preceding matching event might be interleaved with other events which are ignored. + * + * @param name Name of the new pattern operator + * @return A new pattern operator which is appended to this pattern operator + */ + def followedBy(name: String): FollowedByPattern[T, T] = { + FollowedByPattern(jPattern.followedBy(name)) + } + + /** + * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * + * @param filter Filter condition + * @return The same pattern operator where the new filter condition is set + */ + def where(filter: FilterFunction[F]): Pattern[T, F] = { + jPattern.where(filter) + this + } + + /** + * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * + * @param filterFun Filter condition + * @return The same pattern operator where the new filter condition is set + */ + def where(filterFun: F => Boolean): Pattern[T, F] = { + val filter = new FilterFunction[F] { + val cleanFilter = cep.scala.cleanClosure(filterFun) + + override def filter(value: F): Boolean = cleanFilter(value) + } + where(filter) + } + + /** + * + * @return The previous pattern operator + */ + def getPrevious(): Option[Pattern[T, _ <: T]] = { + wrapPattern(jPattern.getPrevious()) + } + +} + +object Pattern { + + /** + * Constructs a new Pattern by wrapping a given Java API Pattern + * + * @param jPattern Underlying Java API Pattern. + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + * @return New wrapping Pattern object + */ + def apply[T, F <: T](jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern) + + /** + * Starts a new pattern with the initial pattern operator whose name is provided. Furthermore, + * the base type of the event sequence is set. + * + * @param name Name of the new pattern operator + * @tparam X Base type of the event pattern + * @return The first pattern operator of a pattern + */ + def begin[X](name: String): Pattern[X, X] = Pattern(JPattern.begin(name)) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala new file mode 100644 index 0000000..382c160 --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern} + +package object pattern { + /** + * Utility method to wrap [[org.apache.flink.cep.pattern.Pattern]] and its subclasses + * for usage with the Scala API. + * + * @param javaPattern The underlying pattern from the Java API + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + * @return A pattern from the Scala API which wraps the pattern from the Java API + */ + private[flink] def wrapPattern[T, F <: T](javaPattern: JPattern[T, F]) + : Option[Pattern[T, F]] = javaPattern match { + case f: JFollowedByPattern[T, F] => Some(FollowedByPattern[T, F](f)) + case p: JPattern[T, F] => Some(Pattern[T, F](p)) + case _ => None + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala new file mode 100644 index 0000000..560d99a --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaAPICompletenessTest.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import java.lang.reflect.Method + +import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase +import org.apache.flink.cep.{PatternStream => JPatternStream} +import org.junit.Test + +import scala.language.existentials + +/** + * This checks whether the CEP Scala API is up to feature parity with the Java API. + * Implements the [[ScalaAPICompletenessTestBase]] for CEP. + */ +class CEPScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { + + override def isExcludedByName(method: Method): Boolean = { + val name = method.getDeclaringClass().getName() + "." + method.getName() + val excludedNames = Seq() + excludedNames.contains(name) + } + + @Test + override def testCompleteness(): Unit = { + checkMethods("PatternStream", "PatternStream", + classOf[JPatternStream[_]], + classOf[PatternStream[_]]) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala new file mode 100644 index 0000000..7daebfe --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala + +import org.apache.flink.api.common.functions.util.ListCollector +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.transformations.OneInputTransformation +import org.apache.flink.util.{Collector, TestLogger} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import org.junit.Assert._ +import org.junit.Test + +class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger { + + @Test + @throws[Exception] + def testScalaJavaAPISelectFunForwarding { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + val dummyDataStream: DataStream[(Int, Int)] = env.fromElements() + val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy") + val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern) + val param = mutable.Map("begin" ->(1, 2)).asJava + val result: DataStream[(Int, Int)] = pStream + .select((pattern: mutable.Map[String, (Int, Int)]) => { + //verifies input parameter forwarding + assertEquals(param, pattern.asJava) + param.get("begin") + }) + val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result) + .getUserFunction.map(param) + //verifies output parameter forwarding + assertEquals(param.get("begin"), out) + } + + @Test + @throws[Exception] + def testScalaJavaAPIFlatSelectFunForwarding { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + val dummyDataStream: DataStream[List[Int]] = env.fromElements() + val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy") + val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern) + val inList = List(1, 2, 3) + val inParam = mutable.Map("begin" -> inList).asJava + val outList = new java.util.ArrayList[List[Int]] + val outParam = new ListCollector[List[Int]](outList) + + val result: DataStream[List[Int]] = pStream + + .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => { + //verifies input parameter forwarding + assertEquals(inParam, pattern.asJava) + out.collect(pattern.get("begin").get) + }) + + extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result). + getUserFunction.flatMap(inParam, outParam) + //verify output parameter forwarding and that flatMap function was actually called + assertEquals(inList, outList.get(0)) + } + + def extractUserFunction[T](dataStream: DataStream[_]) = { + dataStream.javaStream + .getTransformation + .asInstanceOf[OneInputTransformation[_, _]] + .getOperator + .asInstanceOf[T] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala new file mode 100644 index 0000000..ac72bdf --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternScalaAPICompletenessTest.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import java.lang.reflect.Method + +import org.apache.flink.api.scala.completeness.ScalaAPICompletenessTestBase +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.junit.Test + +import scala.language.existentials + +/** + * This checks whether the CEP Scala API is up to feature parity with the Java API. + * Implements the [[ScalaAPICompletenessTestBase]] for CEP. + */ +class PatternScalaAPICompletenessTest extends ScalaAPICompletenessTestBase { + + override def isExcludedByName(method: Method): Boolean = { + val name = method.getDeclaringClass.getName + "." + method.getName + val excludedNames = Seq() + excludedNames.contains(name) + } + + @Test + override def testCompleteness(): Unit = { + checkMethods("Pattern", "Pattern", classOf[JPattern[_, _]], classOf[Pattern[_, _]]) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala new file mode 100644 index 0000000..5f49031 --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep.pattern.{AndFilterFunction, SubtypeFilterFunction, Pattern => JPattern} +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.cep.Event +import org.apache.flink.cep.SubEvent + +class PatternTest { + + /** + * These tests simply check that the pattern construction completes without failure and that the + * Scala API pattern is synchronous with its wrapped Java API pattern. + */ + + @Test + def testStrictContiguity: Unit = { + val pattern = Pattern.begin[Event]("start").next("next").next("end") + val jPattern = JPattern.begin[Event]("start").next("next").next("end") + + + assertTrue(checkCongruentRepresentations(pattern, jPattern)) + assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern)) + + assertTrue(checkCongruentRepresentations(pattern, pattern.wrappedPattern)) + val previous = pattern.getPrevious.orNull + val preprevious = previous.getPrevious.orNull + + assertTrue(pattern.getPrevious.isDefined) + assertTrue(previous.getPrevious.isDefined) + assertFalse(preprevious.getPrevious.isDefined) + + assertEquals(pattern.getName, "end") + assertEquals(previous.getName, "next") + assertEquals(preprevious.getName, "start") + } + + + @Test + def testNonStrictContiguity: Unit = { + val pattern = Pattern.begin[Event]("start").followedBy("next").followedBy("end") + val jPattern = JPattern.begin[Event]("start").followedBy("next").followedBy("end") + + assertTrue(checkCongruentRepresentations(pattern, jPattern)) + assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern)) + val previous = pattern.getPrevious.orNull + val preprevious = previous.getPrevious.orNull + + assertTrue(pattern.getPrevious.isDefined) + assertTrue(previous.getPrevious.isDefined) + assertFalse(preprevious.getPrevious.isDefined) + + assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]]) + assertTrue(previous.isInstanceOf[FollowedByPattern[_, _]]) + + assertEquals(pattern.getName, "end") + assertEquals(previous.getName, "next") + assertEquals(preprevious.getName, "start") + } + + @Test + def testStrictContiguityWithCondition: Unit = { + val pattern = Pattern.begin[Event]("start") + .next("next") + .where((value: Event) => value.getName() == "foobar") + .next("end") + .where((value: Event) => value.getId() == 42) + + val jPattern = JPattern.begin[Event]("start") + .next("next") + .where(new FilterFunction[Event]() { + @throws[Exception] + def filter(value: Event): Boolean = { + return value.getName() == "foobar" + } + }).next("end") + .where(new FilterFunction[Event]() { + @throws[Exception] + def filter(value: Event): Boolean = { + return value.getId() == 42 + } + }) + + assertTrue(checkCongruentRepresentations(pattern, jPattern)) + assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern)) + + val previous = pattern.getPrevious.orNull + val preprevious = previous.getPrevious.orNull + + assertTrue(pattern.getPrevious.isDefined) + assertTrue(previous.getPrevious.isDefined) + assertFalse(preprevious.getPrevious.isDefined) + + assertTrue(pattern.getFilterFunction.isDefined) + assertTrue(previous.getFilterFunction.isDefined) + assertFalse(preprevious.getFilterFunction.isDefined) + + assertEquals(pattern.getName, "end") + assertEquals(previous.getName, "next") + assertEquals(preprevious.getName, "start") + } + + @Test + def testPatternWithSubtyping: Unit = { + val pattern = Pattern.begin[Event]("start") + .next("subevent") + .subtype(classOf[SubEvent]) + .followedBy("end") + + val jPattern = JPattern.begin[Event]("start") + .next("subevent") + .subtype(classOf[SubEvent]) + .followedBy("end") + + assertTrue(checkCongruentRepresentations(pattern, jPattern)) + assertTrue(checkCongruentRepresentations(wrapPattern(jPattern).get, jPattern)) + + val previous = pattern.getPrevious.orNull + val preprevious = previous.getPrevious.orNull + + assertTrue(pattern.getPrevious.isDefined) + assertTrue(previous.getPrevious.isDefined) + assertFalse(preprevious.getPrevious.isDefined) + + assertTrue(previous.getFilterFunction.isDefined) + assertTrue(previous.getFilterFunction.get.isInstanceOf[SubtypeFilterFunction[_]]) + + assertEquals(pattern.getName, "end") + assertEquals(previous.getName, "subevent") + assertEquals(preprevious.getName, "start") + } + + @Test + def testPatternWithSubtypingAndFilter: Unit = { + val pattern = Pattern.begin[Event]("start") + .next("subevent") + .subtype(classOf[SubEvent]) + .where((value: SubEvent) => false) + .followedBy("end") + + val jpattern = JPattern.begin[Event]("start") + .next("subevent") + .subtype(classOf[SubEvent]) + .where(new FilterFunction[SubEvent]() { + @throws[Exception] + def filter(value: SubEvent): Boolean = { + return false + } + }).followedBy("end") + + assertTrue(checkCongruentRepresentations(pattern, jpattern)) + assertTrue(checkCongruentRepresentations(wrapPattern(jpattern).get, jpattern)) + + + val previous = pattern.getPrevious.orNull + val preprevious = previous.getPrevious.orNull + + assertTrue(pattern.getPrevious.isDefined) + assertTrue(previous.getPrevious.isDefined) + assertFalse(preprevious.getPrevious.isDefined) + + assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]]) + assertTrue(previous.getFilterFunction.isDefined) + + assertEquals(pattern.getName, "end") + assertEquals(previous.getName, "subevent") + assertEquals(preprevious.getName, "start") + } + + def checkCongruentRepresentations[T, _ <: T](pattern: Pattern[T, _ <: T], + jPattern: JPattern[T, _ <: T]): Boolean = { + ((pattern == null && jPattern == null) + || (pattern != null && jPattern != null) + //check equal pattern names + && threeWayEquals( + pattern.getName, + pattern.wrappedPattern.getName, + jPattern.getName()) + //check equal time windows + && threeWayEquals( + pattern.getWindowTime.orNull, + pattern.wrappedPattern.getWindowTime, + jPattern.getWindowTime()) + //check congruent class names / types + && threeWayEquals( + pattern.getClass.getSimpleName, + pattern.wrappedPattern.getClass.getSimpleName, + jPattern.getClass().getSimpleName()) + //best effort to confirm congruent filter functions + && compareFilterFunctions( + pattern.getFilterFunction.orNull, + jPattern.getFilterFunction()) + //recursively check previous patterns + && checkCongruentRepresentations( + pattern.getPrevious.orNull, + jPattern.getPrevious())) + } + + def threeWayEquals(a: AnyRef, b: AnyRef, c: AnyRef): Boolean = { + a == b && b == c + } + + def compareFilterFunctions(sFilter: FilterFunction[_], jFilter: FilterFunction[_]): Boolean = { + /** + * We would like to simply compare the filter functions like this: + * + * {{{(pattern.getFilterFunction.orNull == jPattern.getFilterFunction)}}} + * + * However, the closure cleaning makes comparing filter functions by reference impossible. + * Testing for functional equivalence is an undecidable problem. Thus, for do a best effort by + * simply matching the presence/absence and known classes of filter functions in the patterns. + */ + (sFilter, jFilter) match { + //matching types: and-filter; branch and recurse for inner filters + case (saf: AndFilterFunction[_], jaf: AndFilterFunction[_]) + => (compareFilterFunctions(saf.getLeft(), jaf.getLeft()) + && compareFilterFunctions(saf.getRight(), jaf.getRight())) + //matching types: subtype-filter + case (saf: SubtypeFilterFunction[_], jaf: SubtypeFilterFunction[_]) => true + //mismatch: one-sided and/subtype-filter + case (_: AndFilterFunction[_] | _: SubtypeFilterFunction[_], _) => false + case (_, _: AndFilterFunction[_] | _: SubtypeFilterFunction[_]) => false + //from here we can only check mutual presence or absence of a function + case (s: FilterFunction[_], j: FilterFunction[_]) => true + case (null, null) => true + case _ => false + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml index dee7254..7932e14 100644 --- a/flink-libraries/flink-cep/pom.xml +++ b/flink-libraries/flink-cep/pom.xml @@ -17,63 +17,79 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> + <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-libraries</artifactId> - <version>1.1-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-cep_2.10</artifactId> - <name>flink-cep</name> + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-libraries</artifactId> + <version>1.1-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> + <artifactId>flink-cep_2.10</artifactId> + <name>flink-cep</name> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - </dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> - <packaging>jar</packaging> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <packaging>jar</packaging> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 88505a4..57c5a9b 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -61,7 +61,8 @@ public class PatternStream<T> { public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) { // we have to extract the output type from the provided pattern selection function manually // because the TypeExtractor cannot do that if the method is wrapped in a MapFunction - TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( + + TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType( patternSelectFunction, PatternSelectFunction.class, 1, @@ -70,8 +71,24 @@ public class PatternStream<T> { null, false); + return select(patternSelectFunction, returnType); + } + + /** + * Applies a select function to the detected pattern sequence. For each pattern sequence the + * provided {@link PatternSelectFunction} is called. The pattern select function can produce + * exactly one resulting element. + * + * @param patternSelectFunction The pattern select function which is called for each detected + * pattern sequence. + * @param <R> Type of the resulting elements + * @param outTypeInfo Explicit specification of output type. + * @return {@link DataStream} which contains the resulting elements from the pattern select + * function. + */ + public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) { return patternStream.map( - new PatternSelectMapper<T, R>( + new PatternSelectMapper<>( patternStream.getExecutionEnvironment().clean(patternSelectFunction))) .returns(outTypeInfo); } @@ -99,8 +116,24 @@ public class PatternStream<T> { null, false); + return flatSelect(patternFlatSelectFunction, outTypeInfo); + } + + /** + * Applies a flat select function to the detected pattern sequence. For each pattern sequence + * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function + * can produce an arbitrary number of resulting elements. + * + * @param patternFlatSelectFunction The pattern flat select function which is called for each + * detected pattern sequence. + * @param <R> Typ of the resulting elements + * @param outTypeInfo Explicit specification of output type. + * @return {@link DataStream} which contains the resulting elements from the pattern flat select + * function. + */ + public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) { return patternStream.flatMap( - new PatternFlatSelectMapper<T, R>( + new PatternFlatSelectMapper<>( patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction) )).returns(outTypeInfo); } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java index d01643d..ecaee07 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java @@ -41,4 +41,12 @@ public class AndFilterFunction<T> implements FilterFunction<T> { public boolean filter(T value) throws Exception { return left.filter(value) && right.filter(value); } + + public FilterFunction<T> getLeft() { + return left; + } + + public FilterFunction<T> getRight() { + return right; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 560ea0c..696518e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -45,7 +45,7 @@ public class Pattern<T, F extends T> { private final String name; // previous pattern operator - private final Pattern<T, ?> previous; + private final Pattern<T, ? extends T> previous; // filter condition for an event to be matched private FilterFunction<F> filterFunction; @@ -53,7 +53,7 @@ public class Pattern<T, F extends T> { // window length in which the pattern match has to occur private Time windowTime; - protected Pattern(final String name, final Pattern<T, ?> previous) { + protected Pattern(final String name, final Pattern<T, ? extends T> previous) { this.name = name; this.previous = previous; } @@ -62,7 +62,7 @@ public class Pattern<T, F extends T> { return name; } - public Pattern<T, ?> getPrevious() { + public Pattern<T, ? extends T> getPrevious() { return previous; } http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-libraries/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml index 4f26754..b39cbd5 100644 --- a/flink-libraries/pom.xml +++ b/flink-libraries/pom.xml @@ -41,5 +41,6 @@ under the License. <module>flink-table</module> <module>flink-ml</module> <module>flink-cep</module> + <module>flink-cep-scala</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-streaming-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index 4310852..7358624 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -215,6 +215,19 @@ under the License. <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation> </configuration> </plugin> + + <!-- Generate the test-jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/e29ac036/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala index d1055d0..907ad9f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala @@ -82,6 +82,10 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger { } } + protected def checkEquality(scalaInstance: AnyRef, extractJavaFun : ((AnyRef) => AnyRef)) { + val javaInstance = extractJavaFun(scalaInstance) + } + /** * Tests to be performed to ensure API completeness. */