Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
This code for example StreamingFileSink .forRowFormat(new Path(PATH), new SimpleStringEncoder<KafkaRecord>()) .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() { @Override public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException { return false; } @Override public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, KafkaRecord element) throws IOException { return partFileState.getSize() > 1024 * 1024 * 1024l; } @Override public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException { return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l || currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l; } } ) .build(); few things I see and am not sure I follow about the new RollingFileSink vis a vis BucketingSink 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink. I would assume that it would be pending and then finalized on checkpoint for exactly once semantics ? 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ? 3. The inprogress files are like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ? I have the following set up on the env env.enableCheckpointing(10 * 60000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1))); StateBackend stateBackEnd = new MemoryStateBackend(); env.setStateBackend(stateBackEnd); Regards.