[ 
https://issues.apache.org/jira/browse/FLINK-6658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021184#comment-16021184
 ] 

ASF GitHub Bot commented on FLINK-6658:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3963#discussion_r117989704
  
    --- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
    @@ -296,25 +294,94 @@ class PatternStream[T](jPatternStream: 
JPatternStream[T]) {
         *         timeout events wrapped in a [[Either]] type.
         */
       def flatSelect[L: TypeInformation, R: TypeInformation](
    -      patternFlatTimeoutFunction: (mutable.Map[String, JList[T]], Long, 
Collector[L]) => Unit) (
    -      patternFlatSelectFunction: (mutable.Map[String, JList[T]], 
Collector[R]) => Unit)
    +      patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, 
Collector[L]) => Unit) (
    +      patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) 
=> Unit)
         : DataStream[Either[L, R]] = {
     
         val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
         val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
     
         val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
    -      override def flatSelect(pattern: JMap[String, JList[T]], out: 
Collector[R]): Unit = {
    -        cleanSelectFun(pattern.asScala, out)
    +      override def flatSelect(pattern: JMap[String, JList[T]], out: 
Collector[R]): Unit =
    +        cleanSelectFun(mapToScala(pattern), out)
    +    }
    +
    +    val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
    +      override def timeout(
    +        pattern: JMap[String, JList[T]],
    +        timeoutTimestamp: Long, out: Collector[L])
    +      : Unit = {
    +        cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out)
           }
         }
     
    --- End diff --
    
    Why are the following two methods added?


> Use scala Collections in scala CEP API
> --------------------------------------
>
>                 Key: FLINK-6658
>                 URL: https://issues.apache.org/jira/browse/FLINK-6658
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to