I think the only rolling policy that can be used is CheckpointRollingPolicy to ensure exactly once.
Tim On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santo...@gmail.com wrote: > Can StreamingFileSink be used instead of > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, > even though it looks it could. > > > This code for example > > > StreamingFileSink > .forRowFormat(new Path(PATH), > new SimpleStringEncoder<KafkaRecord>()) > .withBucketAssigner(new > KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) > .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() { > @Override > public boolean > shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException > { > return false; > } > > @Override > public boolean > shouldRollOnEvent(PartFileInfo<String> partFileState, > > KafkaRecord element) throws IOException { > return partFileState.getSize() > > 1024 * 1024 * 1024l; > } > > @Override > public boolean > shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long > currentTime) throws IOException { > return currentTime - > partFileState.getLastUpdateTime() > 10 * 60 * 1000l || > currentTime - > partFileState.getCreationTime() > 120 * 60 * 1000l; > } > } > ) > .build(); > > > few things I see and am not sure I follow about the new RollingFileSink vis > a vis BucketingSink > > > 1. I do not ever see the inprogress file go to the pending state, as in > renamed as pending, as was the case in Bucketing Sink. I would assume that > it would be pending and then > > finalized on checkpoint for exactly once semantics ? > > > 2. I see dangling inprogress files at the end of the day. I would assume that > the withBucketCheckInterval set to 1 minute by default, the > shouldRollOnProcessingTime should kick in ? > > 3. The inprogress files are like > .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that > additional suffix ? > > > > > I have the following set up on the env > > env.enableCheckpointing(10 * 60000); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.setRestartStrategy(fixedDelayRestart(4, > org.apache.flink.api.common.time.Time.minutes(1))); > StateBackend stateBackEnd = new MemoryStateBackend(); > env.setStateBackend(stateBackEnd); > > > Regards. > > > > >