[ 
https://issues.apache.org/jira/browse/FLINK-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055854#comment-16055854
 ] 

Shashank Agarwal commented on FLINK-6844:
-----------------------------------------

[~tzulitai]

I am using kafka as source and there's no issue with that. Actually what I am 
using flink cep this code was working fine with 1.2.0 and 1.2.1 i have updated 
my applications to 1.3.0 

Applications are working where i haven't used CEP, In application i have used 
cep was giving following exception and terminating the checkpointing for all. 

{code}
java.lang.Exception: Could not perform checkpoint 1 for operator 
KeyedCEPPatternOperator -> Map (6/6).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
        at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
        at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
        at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
KeyedCEPPatternOperator -> Map (6/6).
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
        ... 8 more
Caused by: java.lang.UnsupportedOperationException
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.snapshotConfiguration(TraversableSerializer.scala:155)
        at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
        at 
org.apache.flink.api.scala.typeutils.OptionSerializer$OptionSerializerConfigSnapshot.<init>(OptionSerializer.scala:139)
        at 
org.apache.flink.api.scala.typeutils.OptionSerializer.snapshotConfiguration(OptionSerializer.scala:104)
        at 
org.apache.flink.api.scala.typeutils.OptionSerializer.snapshotConfiguration(OptionSerializer.scala:28)
        at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.<init>(TupleSerializerConfigSnapshot.java:45)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:132)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.snapshotConfiguration(TupleSerializerBase.java:39)
        at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
        at 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:183)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:47)
        at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
        at 
org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot.<init>(MapSerializerConfigSnapshot.java:38)
        at 
org.apache.flink.runtime.state.HashMapSerializer.snapshotConfiguration(HashMapSerializer.java:210)
        at 
org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
        ... 13 more
{code}

Than i have applied your patch on release-1.3.0 tag and used that with this 
code still without CEP app is working fine, but CEP app is not printing any 
error logs and no checkpointing happening. It's showing 

{code}
Triggered: 1In Progress: 0
ID: 1Failure Time: 18:48:41Cause: Checkpoint was declined (tasks not ready)
{code}

after restarting the job even it's not triggering the checkpointing and 
printing the logs i have mentioned above. For replicate i think just use CEP 
with fsstandbackend.


> TraversableSerializer should implement compatibility methods
> ------------------------------------------------------------
>
>                 Key: FLINK-6844
>                 URL: https://issues.apache.org/jira/browse/FLINK-6844
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>    Affects Versions: 1.3.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>              Labels: flink-rel-1.3.1-blockers
>             Fix For: 1.3.1, 1.4.0
>
>
> The {{TraversableSerializer}} may be used as a serializer for managed state 
> and takes part in checkpointing, therefore should implement the compatibility 
> methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to