[ https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ChangjiGuo updated FLINK-22497: ------------------------------- Environment: hadoop-2.8.4,Flink-1.11.2 (was: hadoop-2.8.4) > When using DefaultRollingPolicy in StreamingFileSink, the file will be > finished delayed > --------------------------------------------------------------------------------------- > > Key: FLINK-22497 > URL: https://issues.apache.org/jira/browse/FLINK-22497 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem > Affects Versions: 1.11.2 > Environment: hadoop-2.8.4,Flink-1.11.2 > Reporter: ChangjiGuo > Priority: Major > Attachments: 1.png > > > I had a doubt when testing StreamingFileSink: > The default 60s rolling interval in DefaultRollingPolicy is detected by > procTimeService. If the rolling interval is not met this time, it will be > delayed to the next timer trigger point (after 60s), so this is not > real-time. For example, if the checkpoint period is set to 60s, the file > should be converted to finished at the second checkpoint, but it will be > delayed to the third checkpoint. > You can refer to the attached picture for detail. > If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of > Bucket.write method, the file will be set to finished as we expect at the > second checkpoint. > {code:java} > void write(IN element, long currentTime) throws IOException { > if (inProgressPart == null || > rollingPolicy.shouldRollOnEvent(inProgressPart, element) || > rollingPolicy.shouldRollOnProcessingTime(inProgressPart, currentTime)) { > if (LOG.isDebugEnabled()) { > LOG.info("Subtask {} closing in-progress part file for bucket > id={} due to element {}.", subtaskIndex, bucketId, element); > } > rollPartFile(currentTime); > } > inProgressPart.write(element, currentTime); > } > {code} > > Is my understanding correct? Or can we do this? > Thanks! ^_^ -- This message was sent by Atlassian Jira (v8.3.4#803005)