My apologies for not seeing your use case properly.   The constraint on
rolling policy is only applicable for bulk formats such as Parquet as
highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.
 I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santo...@gmail.com
wrote:

> That said the in the DefaultRollingPolicy it seems the check is on the
> file size ( mimics the check shouldRollOnEVent()).
>
> I guess the question is
>
> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread
> ?
>
> Are the calls to the other 2 methods shouldRollOnEVent and
> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>
>
>
>
>
> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> Thanks for the quick reply.
>>
>> I am confused. If this was a more full featured BucketingSink ,I would
>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>> progress file could go into pending phase and on checkpoint the pending
>> part file would be  finalized. For exactly once any files ( in progress
>> file ) will have a length of the file  snapshotted to the checkpoint  and
>> used to truncate the file ( if supported ) or dropped as a part-length file
>> ( if truncate not supported )  if a resume from a checkpoint was to happen,
>> to indicate what part of the the finalized file ( finalized when resumed )
>> was valid . and  I had always assumed ( and there is no doc otherwise )
>> that shouldRollOnCheckpoint would be similar to the other 2 apart from
>> the fact it does the roll and finalize step in a single step on a
>> checkpoint.
>>
>>
>> Am I better off using BucketingSink ?  When to use BucketingSink and when
>> to use RollingSink is not clear at all, even though at the surface it sure
>> looks RollingSink is a better version of .BucketingSink ( or not )
>>
>> Regards.
>>
>>
>>
>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com>
>> wrote:
>>
>>> I think the only rolling policy that can be used is
>>> CheckpointRollingPolicy to ensure exactly once.
>>>
>>> Tim
>>>
>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santo...@gmail.com
>>> wrote:
>>>
>>>> Can StreamingFileSink be used instead of 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>>>>  even though it looks it could.
>>>>
>>>>
>>>> This code for example
>>>>
>>>>
>>>>         StreamingFileSink
>>>>                 .forRowFormat(new Path(PATH),
>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>                 .withBucketAssigner(new 
>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, 
>>>> String>() {
>>>>                                        @Override
>>>>                                        public boolean 
>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws 
>>>> IOException {
>>>>                                            return false;
>>>>                                        }
>>>>
>>>>                                        @Override
>>>>                                        public boolean 
>>>> shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>                                                                         
>>>> KafkaRecord element) throws IOException {
>>>>                                            return partFileState.getSize() 
>>>> > 1024 * 1024 * 1024l;
>>>>                                        }
>>>>
>>>>                                        @Override
>>>>                                        public boolean 
>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
>>>> currentTime) throws IOException {
>>>>                                            return currentTime - 
>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>                                                    currentTime - 
>>>> partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>                                        }
>>>>                                    }
>>>>                 )
>>>>                 .build();
>>>>
>>>>
>>>> few things I see and am not sure I follow about the new RollingFileSink  
>>>> vis a vis BucketingSink
>>>>
>>>>
>>>> 1. I do not ever see the inprogress file go to the pending state, as in 
>>>> renamed as pending, as was the case in Bucketing Sink.  I would assume 
>>>> that it would be pending and then
>>>>
>>>>    finalized on checkpoint for exactly once semantics ?
>>>>
>>>>
>>>> 2. I see dangling inprogress files at the end of the day. I would assume 
>>>> that the withBucketCheckInterval set to 1 minute by default, the 
>>>> shouldRollOnProcessingTime should kick in ?
>>>>
>>>>  3. The inprogress files are  like 
>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that 
>>>> additional suffix ?
>>>>
>>>>
>>>>
>>>>
>>>> I have the following set up on the env
>>>>
>>>> env.enableCheckpointing(10 * 60000);
>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>> env.setRestartStrategy(fixedDelayRestart(4, 
>>>> org.apache.flink.api.common.time.Time.minutes(1)));
>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>> env.setStateBackend(stateBackEnd);
>>>>
>>>>
>>>> Regards.
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to