[ 
https://issues.apache.org/jira/browse/FLINK-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16056044#comment-16056044
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6844:
--------------------------------------------

[~shashank734]

I've tested CEP + with Scala collections as the event type (which would then 
let the CEP operator use the {{TraversableSerializer}} internally), with the 
following code:
{code}
object FlinkCEPTest {
  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(5000)

    val source: DataStream[scala.collection.immutable.List[java.lang.String]] = 
env.socketTextStream("localhost", 9999).map(x => List(x.split(",")(0)))
    val pattern = Pattern.begin("start").where(new 
SimpleCondition[List[String]] {
      override def filter(t: List[String]) = t.head.equals("a")
    }).times(4).allowCombinations().followedBy("end").where(new 
SimpleCondition[List[String]] {
      override def filter(t: List[String]) = t.head.equals("b")
    })

    CEP.pattern(source, pattern).select(_.toString()).print()

    // execute program
    env.execute("Flink CEP test")
  }
}
{code}

I can confirm that this works correctly without any errors in branch 
{{release-1.3}}. Checkpoint + restoring from savepoints works correctly.
As I've mentioned, simply applying the commit for this JIRA onto 
{{release-1.3.0}} may not work, as the whole fix includes other commits as well.

Please let me know if you think otherwise!

> TraversableSerializer should implement compatibility methods
> ------------------------------------------------------------
>
>                 Key: FLINK-6844
>                 URL: https://issues.apache.org/jira/browse/FLINK-6844
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 1.3.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>              Labels: flink-rel-1.3.1-blockers
>             Fix For: 1.3.1, 1.4.0
>
>
> The {{TraversableSerializer}} may be used as a serializer for managed state 
> and takes part in checkpointing, therefore should implement the compatibility 
> methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to