That said the in the DefaultRollingPolicy it seems the check is on the file
size ( mimics the check shouldRollOnEVent()).

I guess the question is

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ?

Are the calls to the other 2 methods shouldRollOnEVent and
shouldRollOnProcessingTIme done on the execution thread  as in inlined ?





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Thanks for the quick reply.
>
> I am confused. If this was a more full featured BucketingSink ,I would
> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
> progress file could go into pending phase and on checkpoint the pending
> part file would be  finalized. For exactly once any files ( in progress
> file ) will have a length of the file  snapshotted to the checkpoint  and
> used to truncate the file ( if supported ) or dropped as a part-length file
> ( if truncate not supported )  if a resume from a checkpoint was to happen,
> to indicate what part of the the finalized file ( finalized when resumed )
> was valid . and  I had always assumed ( and there is no doc otherwise )
> that shouldRollOnCheckpoint would be similar to the other 2 apart from
> the fact it does the roll and finalize step in a single step on a
> checkpoint.
>
>
> Am I better off using BucketingSink ?  When to use BucketingSink and when
> to use RollingSink is not clear at all, even though at the surface it sure
> looks RollingSink is a better version of .BucketingSink ( or not )
>
> Regards.
>
>
>
> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com> wrote:
>
>> I think the only rolling policy that can be used is
>> CheckpointRollingPolicy to ensure exactly once.
>>
>> Tim
>>
>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santo...@gmail.com
>> wrote:
>>
>>> Can StreamingFileSink be used instead of 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>>>  even though it looks it could.
>>>
>>>
>>> This code for example
>>>
>>>
>>>         StreamingFileSink
>>>                 .forRowFormat(new Path(PATH),
>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>                 .withBucketAssigner(new 
>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() 
>>> {
>>>                                        @Override
>>>                                        public boolean 
>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws 
>>> IOException {
>>>                                            return false;
>>>                                        }
>>>
>>>                                        @Override
>>>                                        public boolean 
>>> shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>                                                                         
>>> KafkaRecord element) throws IOException {
>>>                                            return partFileState.getSize() > 
>>> 1024 * 1024 * 1024l;
>>>                                        }
>>>
>>>                                        @Override
>>>                                        public boolean 
>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
>>> currentTime) throws IOException {
>>>                                            return currentTime - 
>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>                                                    currentTime - 
>>> partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>                                        }
>>>                                    }
>>>                 )
>>>                 .build();
>>>
>>>
>>> few things I see and am not sure I follow about the new RollingFileSink  
>>> vis a vis BucketingSink
>>>
>>>
>>> 1. I do not ever see the inprogress file go to the pending state, as in 
>>> renamed as pending, as was the case in Bucketing Sink.  I would assume that 
>>> it would be pending and then
>>>
>>>    finalized on checkpoint for exactly once semantics ?
>>>
>>>
>>> 2. I see dangling inprogress files at the end of the day. I would assume 
>>> that the withBucketCheckInterval set to 1 minute by default, the 
>>> shouldRollOnProcessingTime should kick in ?
>>>
>>>  3. The inprogress files are  like 
>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that 
>>> additional suffix ?
>>>
>>>
>>>
>>>
>>> I have the following set up on the env
>>>
>>> env.enableCheckpointing(10 * 60000);
>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>> env.setRestartStrategy(fixedDelayRestart(4, 
>>> org.apache.flink.api.common.time.Time.minutes(1)));
>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>> env.setStateBackend(stateBackEnd);
>>>
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>>

Reply via email to