[ 
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249608#comment-15249608
 ] 

ASF GitHub Bot commented on FLINK-3708:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1905#discussion_r60383704
  
    --- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.cep.scala.pattern
    +
    +import org.apache.flink.api.common.functions.FilterFunction
    +import org.apache.flink.cep
    +import org.apache.flink.cep.pattern.{Pattern => JPattern}
    +import org.apache.flink.streaming.api.windowing.time.Time
    +
    +import scala.reflect.ClassTag
    +
    +/**
    +  * Base class for a pattern definition.
    +  * <p>
    +  * A pattern definition is used by { @link 
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
    +  * a { @link NFA}.
    +  *
    +  * <pre>{ @code
    +  * Pattern<T, F> pattern = Pattern.<T>begin("start")
    +  * .next("middle").subtype(F.class)
    +  * .followedBy("end").where(new MyFilterFunction());
    +  * }
    +  * </pre>
    +  *
    +  * @param jPattern Underlying Java API Pattern
    +  * @tparam T Base type of the elements appearing in the pattern
    +  * @tparam F Subtype of T to which the current pattern operator is 
constrained
    +  */
    +class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
    +
    +  private[flink] def getWrappedPattern = jPattern
    +
    +
    +  /**
    +    *
    +    * @return Name of the pattern operator
    +    */
    +  def getName: String = jPattern.getName
    +
    +  /**
    +    *
    +    * @return Window length in which the pattern match has to occur
    +    */
    +  def getWindowTime: Option[Time] = {
    +    val time = jPattern.getWindowTime
    +    if (time == null) None else Some(time)
    +  }
    +
    +  /**
    +    *
    +    * @return Filter condition for an event to be matched
    +    */
    +  def getFilterFunction: Option[FilterFunction[F]] = {
    +    val filterFun = jPattern.getFilterFunction
    +    if (filterFun == null) None else Some(filterFun)
    +  }
    +
    +  /**
    +    * Applies a subtype constraint on the current pattern operator. This 
means that an event has
    +    * to be of the given subtype in order to be matched.
    +    *
    +    * @param clazz Class of the subtype
    +    * @tparam S Type of the subtype
    +    * @return The same pattern operator with the new subtype constraint
    +    */
    +  def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
    +    jPattern.subtype(clazz)
    +    this.asInstanceOf[Pattern[T, S]]
    +  }
    +
    +  /**
    +    * Defines the maximum time interval for a matching pattern. This means 
that the time gap
    +    * between first and the last event must not be longer than the window 
time.
    +    *
    +    * @param windowTime Time of the matching window
    +    * @return The same pattern operator with the new window length
    +    */
    +  def within(windowTime: Time): Pattern[T, F] = {
    +    jPattern.within(windowTime)
    +    this
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern 
operator enforces strict
    +    * temporal contiguity. This means that the whole pattern only matches 
if an event which matches
    +    * this operator directly follows the preceding matching event. Thus, 
there cannot be any
    +    * events in between two matching events.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern 
operator
    +    */
    +  def next(name: String): Pattern[T, T] = {
    +    wrapPattern(jPattern.next(name))
    +  }
    +
    +  /**
    +    * Appends a new pattern operator to the existing one. The new pattern 
operator enforces
    +    * non-strict temporal contiguity. This means that a matching event of 
this operator and the
    +    * preceding matching event might be interleaved with other events 
which are ignored.
    +    *
    +    * @param name Name of the new pattern operator
    +    * @return A new pattern operator which is appended to this pattern 
operator
    +    */
    +  def followedBy(name: String): FollowedByPattern[T, T] = {
    +    FollowedByPattern(jPattern.followedBy(name))
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event 
in order to be matched.
    +    *
    +    * @param filter Filter condition
    +    * @return The same pattern operator where the new filter condition is 
set
    +    */
    +  def where(filter: FilterFunction[F]): Pattern[T, F] = {
    +    jPattern.where(filter)
    +    this
    +  }
    +
    +  /**
    +    * Specifies a filter condition which has to be fulfilled by an event 
in order to be matched.
    +    *
    +    * @param filterFun Filter condition
    +    * @return The same pattern operator where the new filter condition is 
set
    +    */
    +  def where(filterFun: F => Boolean): Pattern[T, F] = {
    +    val filter = new FilterFunction[F] {
    +      val cleanFilter = cep.scala.cleanClosure(filterFun)
    +
    +      override def filter(value: F): Boolean = cleanFilter(value)
    +    }
    +    where(filter)
    +  }
    +
    +  //TODO ask about java api change <?> -> <? extends T> and creating a new 
object vs caching object. equals/hashcode?
    +  /**
    +    *
    +    * @return The previous pattern operator
    +    */
    +  def getPrevious: Option[Pattern[T, _ <: T]] = {
    +    val prev = jPattern.getPrevious
    +    if (prev == null) None else Some(wrapPattern(prev))
    --- End diff --
    
    `Option(jPattern.getPrevious)`


> Scala API for CEP
> -----------------
>
>                 Key: FLINK-3708
>                 URL: https://issues.apache.org/jira/browse/FLINK-3708
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the 
> {{TypeExtractor}} cannot handle them. In order to support them, it would be 
> necessary to offer a Scala API for the CEP library.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to