[ https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667328#comment-15667328 ]
ASF GitHub Bot commented on FLINK-5056: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2797#discussion_r88019203 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFaultToleranceITCase.java --- @@ -161,6 +164,15 @@ public void postSubmit() throws Exception { Matcher matcher = messageRegex.matcher(line); if (matcher.matches()) { numRead++; + uniqMessagesRead.add(line); + + // check that in the committed files there are no duplicates + if (!file.getPath().toString().endsWith(".in-progress") && !file.getPath().toString().endsWith(".pending")) { --- End diff -- ".in-progress" and ".pending" should use the constants defined in the BucketingSink. > BucketingSink deletes valid data when checkpoint notification is slow. > ---------------------------------------------------------------------- > > Key: FLINK-5056 > URL: https://issues.apache.org/jira/browse/FLINK-5056 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.1.3 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently if BucketingSink receives no data after a checkpoint and then a > notification about a previous checkpoint arrives, it clears its state. This > can > lead to not committing valid data about intermediate checkpoints for whom > a notification has not arrived yet. As a simple sequence that illustrates the > problem: > -> input data > -> snapshot(0) > -> input data > -> snapshot(1) > -> no data > -> notifyCheckpointComplete(0) > the last will clear the state of the Sink without committing as final the > data > that arrived for checkpoint 1. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)