[ https://issues.apache.org/jira/browse/FLINK-6808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041168#comment-16041168 ]
ASF GitHub Bot commented on FLINK-6808: --------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4052 > Stream join fails when checkpointing is enabled > ----------------------------------------------- > > Key: FLINK-6808 > URL: https://issues.apache.org/jira/browse/FLINK-6808 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.3.0 > Reporter: Francisco Rosa > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Labels: flink-rel-1.3.1-blockers > Fix For: 1.3.1 > > > The combination of joining streams and checkpointing fails in 1.3.0. It used > to work with the previous 1.2 version. Code example for failure: > {code:title=Example|borderStyle=solid} > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // enable checkpoints > env.enableCheckpointing(5000); > // create two streams > DataStreamSource<Long> one = env.generateSequence(0, 5000); > DataStreamSource<Long> two = env.generateSequence(2000, 15000); > // process both, provide a delay to make sure checkpoint will happen > DataStream<String> oneProcessed = one. > map(oneValue -> { > Thread.sleep(1000); > return "val-" + oneValue; > }); > DataStream<String> twoProcessed = two. > map(oneValue -> { > Thread.sleep(1000); > return "val-" + oneValue; > }); > // join the two streams, join on string match > DataStream<String> joinedStreams = oneProcessed. > join(twoProcessed). > where(String::toString). > equalTo(String::toString). > window(TumblingProcessingTimeWindows.of(Time.seconds(5))). > apply(new JoinFunction<String, String, String>() { > @Override > public String join(String oneValue, String twoValue) { > // nothing really relevant, just concatenate string > return oneValue + "+" + twoValue; > } > }); > // output results > joinedStreams.print(); > env.execute("Issue with stream join and checkpoints"); > } > {code} > Stack trace: > {noformat} > java.lang.Exception: Could not perform checkpoint 1 for operator > TriggerWindow(TumblingProcessingTimeWindows(5000), > ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, > ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> > Sink: Unnamed (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > TriggerWindow(TumblingProcessingTimeWindows(5000), > ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, > ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> > Sink: Unnamed (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) > ... 8 more > Caused by: java.lang.UnsupportedOperationException: This serializer is not > registered for managed state. > at > org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) > at > org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39) > at > org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149) > at > org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396) > ... 13 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)