I think the only rolling policy that can be used is CheckpointRollingPolicy
to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santo...@gmail.com
wrote:

> Can StreamingFileSink be used instead of 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>  even though it looks it could.
>
>
> This code for example
>
>
>         StreamingFileSink
>                 .forRowFormat(new Path(PATH),
>                         new SimpleStringEncoder<KafkaRecord>())
>                 .withBucketAssigner(new 
> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>                                        @Override
>                                        public boolean 
> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException 
> {
>                                            return false;
>                                        }
>
>                                        @Override
>                                        public boolean 
> shouldRollOnEvent(PartFileInfo<String> partFileState,
>                                                                         
> KafkaRecord element) throws IOException {
>                                            return partFileState.getSize() > 
> 1024 * 1024 * 1024l;
>                                        }
>
>                                        @Override
>                                        public boolean 
> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
> currentTime) throws IOException {
>                                            return currentTime - 
> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>                                                    currentTime - 
> partFileState.getCreationTime() > 120 * 60 * 1000l;
>                                        }
>                                    }
>                 )
>                 .build();
>
>
> few things I see and am not sure I follow about the new RollingFileSink  vis 
> a vis BucketingSink
>
>
> 1. I do not ever see the inprogress file go to the pending state, as in 
> renamed as pending, as was the case in Bucketing Sink.  I would assume that 
> it would be pending and then
>
>    finalized on checkpoint for exactly once semantics ?
>
>
> 2. I see dangling inprogress files at the end of the day. I would assume that 
> the withBucketCheckInterval set to 1 minute by default, the 
> shouldRollOnProcessingTime should kick in ?
>
>  3. The inprogress files are  like 
> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that 
> additional suffix ?
>
>
>
>
> I have the following set up on the env
>
> env.enableCheckpointing(10 * 60000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.setRestartStrategy(fixedDelayRestart(4, 
> org.apache.flink.api.common.time.Time.minutes(1)));
> StateBackend stateBackEnd = new MemoryStateBackend();
> env.setStateBackend(stateBackEnd);
>
>
> Regards.
>
>
>
>
>

Reply via email to