[ https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249609#comment-15249609 ]
ASF GitHub Bot commented on FLINK-3708: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1905#discussion_r60383831 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.cep.scala.pattern + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.cep +import org.apache.flink.cep.pattern.{Pattern => JPattern} +import org.apache.flink.streaming.api.windowing.time.Time + +import scala.reflect.ClassTag + +/** + * Base class for a pattern definition. + * <p> + * A pattern definition is used by { @link org.apache.flink.cep.nfa.compiler.NFACompiler} to create + * a { @link NFA}. + * + * <pre>{ @code + * Pattern<T, F> pattern = Pattern.<T>begin("start") + * .next("middle").subtype(F.class) + * .followedBy("end").where(new MyFilterFunction()); + * } + * </pre> + * + * @param jPattern Underlying Java API Pattern + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + */ +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) { + + private[flink] def getWrappedPattern = jPattern + + + /** + * + * @return Name of the pattern operator + */ + def getName: String = jPattern.getName + + /** + * + * @return Window length in which the pattern match has to occur + */ + def getWindowTime: Option[Time] = { + val time = jPattern.getWindowTime + if (time == null) None else Some(time) + } + + /** + * + * @return Filter condition for an event to be matched + */ + def getFilterFunction: Option[FilterFunction[F]] = { + val filterFun = jPattern.getFilterFunction + if (filterFun == null) None else Some(filterFun) + } + + /** + * Applies a subtype constraint on the current pattern operator. This means that an event has + * to be of the given subtype in order to be matched. + * + * @param clazz Class of the subtype + * @tparam S Type of the subtype + * @return The same pattern operator with the new subtype constraint + */ + def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = { + jPattern.subtype(clazz) + this.asInstanceOf[Pattern[T, S]] + } + + /** + * Defines the maximum time interval for a matching pattern. This means that the time gap + * between first and the last event must not be longer than the window time. + * + * @param windowTime Time of the matching window + * @return The same pattern operator with the new window length + */ + def within(windowTime: Time): Pattern[T, F] = { + jPattern.within(windowTime) + this + } + + /** + * Appends a new pattern operator to the existing one. The new pattern operator enforces strict + * temporal contiguity. This means that the whole pattern only matches if an event which matches + * this operator directly follows the preceding matching event. Thus, there cannot be any + * events in between two matching events. + * + * @param name Name of the new pattern operator + * @return A new pattern operator which is appended to this pattern operator + */ + def next(name: String): Pattern[T, T] = { + wrapPattern(jPattern.next(name)) + } + + /** + * Appends a new pattern operator to the existing one. The new pattern operator enforces + * non-strict temporal contiguity. This means that a matching event of this operator and the + * preceding matching event might be interleaved with other events which are ignored. + * + * @param name Name of the new pattern operator + * @return A new pattern operator which is appended to this pattern operator + */ + def followedBy(name: String): FollowedByPattern[T, T] = { + FollowedByPattern(jPattern.followedBy(name)) + } + + /** + * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * + * @param filter Filter condition + * @return The same pattern operator where the new filter condition is set + */ + def where(filter: FilterFunction[F]): Pattern[T, F] = { + jPattern.where(filter) + this + } + + /** + * Specifies a filter condition which has to be fulfilled by an event in order to be matched. + * + * @param filterFun Filter condition + * @return The same pattern operator where the new filter condition is set + */ + def where(filterFun: F => Boolean): Pattern[T, F] = { + val filter = new FilterFunction[F] { + val cleanFilter = cep.scala.cleanClosure(filterFun) + + override def filter(value: F): Boolean = cleanFilter(value) + } + where(filter) + } + + //TODO ask about java api change <?> -> <? extends T> and creating a new object vs caching object. equals/hashcode? + /** + * + * @return The previous pattern operator + */ + def getPrevious: Option[Pattern[T, _ <: T]] = { + val prev = jPattern.getPrevious + if (prev == null) None else Some(wrapPattern(prev)) + + } + +} + +object Pattern { + + /** + * Constructs a new Pattern by wrapping a given Java API Pattern + * + * @param jPattern Underlying Java API Pattern. + * @tparam T Base type of the elements appearing in the pattern + * @tparam F Subtype of T to which the current pattern operator is constrained + * @return New wrapping Pattern object + */ + def apply[T: ClassTag, F <: T : ClassTag] + (jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern) --- End diff -- Does that fit in one line? > Scala API for CEP > ----------------- > > Key: FLINK-3708 > URL: https://issues.apache.org/jira/browse/FLINK-3708 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.1.0 > Reporter: Till Rohrmann > Assignee: Stefan Richter > > Currently, The CEP library does not support Scala case classes, because the > {{TypeExtractor}} cannot handle them. In order to support them, it would be > necessary to offer a Scala API for the CEP library. -- This message was sent by Atlassian JIRA (v6.3.4#6332)