That said the in the DefaultRollingPolicy it seems the check is on the file size ( mimics the check shouldRollOnEVent()).
I guess the question is Is the call to shouldRollOnCheckPoint. done by the checkpointing thread ? Are the calls to the other 2 methods shouldRollOnEVent and shouldRollOnProcessingTIme done on the execution thread as in inlined ? On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Thanks for the quick reply. > > I am confused. If this was a more full featured BucketingSink ,I would > imagine that based on shouldRollOnEvent and shouldRollOnEvent, an in > progress file could go into pending phase and on checkpoint the pending > part file would be finalized. For exactly once any files ( in progress > file ) will have a length of the file snapshotted to the checkpoint and > used to truncate the file ( if supported ) or dropped as a part-length file > ( if truncate not supported ) if a resume from a checkpoint was to happen, > to indicate what part of the the finalized file ( finalized when resumed ) > was valid . and I had always assumed ( and there is no doc otherwise ) > that shouldRollOnCheckpoint would be similar to the other 2 apart from > the fact it does the roll and finalize step in a single step on a > checkpoint. > > > Am I better off using BucketingSink ? When to use BucketingSink and when > to use RollingSink is not clear at all, even though at the surface it sure > looks RollingSink is a better version of .BucketingSink ( or not ) > > Regards. > > > > On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com> wrote: > >> I think the only rolling policy that can be used is >> CheckpointRollingPolicy to ensure exactly once. >> >> Tim >> >> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santo...@gmail.com >> wrote: >> >>> Can StreamingFileSink be used instead of >>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, >>> even though it looks it could. >>> >>> >>> This code for example >>> >>> >>> StreamingFileSink >>> .forRowFormat(new Path(PATH), >>> new SimpleStringEncoder<KafkaRecord>()) >>> .withBucketAssigner(new >>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) >>> .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() >>> { >>> @Override >>> public boolean >>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws >>> IOException { >>> return false; >>> } >>> >>> @Override >>> public boolean >>> shouldRollOnEvent(PartFileInfo<String> partFileState, >>> >>> KafkaRecord element) throws IOException { >>> return partFileState.getSize() > >>> 1024 * 1024 * 1024l; >>> } >>> >>> @Override >>> public boolean >>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long >>> currentTime) throws IOException { >>> return currentTime - >>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l || >>> currentTime - >>> partFileState.getCreationTime() > 120 * 60 * 1000l; >>> } >>> } >>> ) >>> .build(); >>> >>> >>> few things I see and am not sure I follow about the new RollingFileSink >>> vis a vis BucketingSink >>> >>> >>> 1. I do not ever see the inprogress file go to the pending state, as in >>> renamed as pending, as was the case in Bucketing Sink. I would assume that >>> it would be pending and then >>> >>> finalized on checkpoint for exactly once semantics ? >>> >>> >>> 2. I see dangling inprogress files at the end of the day. I would assume >>> that the withBucketCheckInterval set to 1 minute by default, the >>> shouldRollOnProcessingTime should kick in ? >>> >>> 3. The inprogress files are like >>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that >>> additional suffix ? >>> >>> >>> >>> >>> I have the following set up on the env >>> >>> env.enableCheckpointing(10 * 60000); >>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>> env.setRestartStrategy(fixedDelayRestart(4, >>> org.apache.flink.api.common.time.Time.minutes(1))); >>> StateBackend stateBackEnd = new MemoryStateBackend(); >>> env.setStateBackend(stateBackEnd); >>> >>> >>> Regards. >>> >>> >>> >>> >>>