[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14711509#comment-14711509 ]
ASF GitHub Bot commented on FLINK-2314: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/997#discussion_r37883013 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java --- @@ -120,12 +131,24 @@ public void run(SourceContext<OUT> ctx) throws Exception { OUT nextElement = serializer.createInstance(); nextElement = format.nextRecord(nextElement); if (nextElement == null && splitIterator.hasNext()) { - format.open(splitIterator.next()); + InputSplit split = splitIterator.next(); + splitNumber = split.getSplitNumber(); + currRecord = 0l; + format.open(split); continue; } else if (nextElement == null) { break; } - ctx.collect(nextElement); + if(splitNumber == checkpointedSplit){ --- End diff -- What if you've checkpointed the 2. split after seeing the 1. and 2. split and now the source is re-executed with the first split? Aren't records written again because you only save the latest checkpointed split number? > Make Streaming File Sources Persistent > -------------------------------------- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Sheetal Parade > Labels: easyfix, starter > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)