[ https://issues.apache.org/jira/browse/FLINK-6658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021514#comment-16021514 ]
ASF GitHub Bot commented on FLINK-6658: --------------------------------------- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3963#discussion_r118055738 --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala --- @@ -296,25 +294,94 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { * timeout events wrapped in a [[Either]] type. */ def flatSelect[L: TypeInformation, R: TypeInformation]( - patternFlatTimeoutFunction: (mutable.Map[String, JList[T]], Long, Collector[L]) => Unit) ( - patternFlatSelectFunction: (mutable.Map[String, JList[T]], Collector[R]) => Unit) + patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long, Collector[L]) => Unit) ( + patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R]) => Unit) : DataStream[Either[L, R]] = { val cleanSelectFun = cleanClosure(patternFlatSelectFunction) val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction) val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] { - override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit = { - cleanSelectFun(pattern.asScala, out) + override def flatSelect(pattern: JMap[String, JList[T]], out: Collector[R]): Unit = + cleanSelectFun(mapToScala(pattern), out) + } + + val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] { + override def timeout( + pattern: JMap[String, JList[T]], + timeoutTimestamp: Long, out: Collector[L]) + : Unit = { + cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out) } } --- End diff -- Yes, I am also referring to those methods. :) > Use scala Collections in scala CEP API > -------------------------------------- > > Key: FLINK-6658 > URL: https://issues.apache.org/jira/browse/FLINK-6658 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.3.0 > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > -- This message was sent by Atlassian JIRA (v6.3.15#6346)