You don't have to. Thank you for the input.

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com> wrote:

> My apologies for not seeing your use case properly.   The constraint on
> rolling policy is only applicable for bulk formats such as Parquet as
> highlighted in the docs.
>
> As for your questions, I'll have to defer to others more familiar with
> it.   I mostly just use bulk formats such as avro and parquet.
>
> Tim
>
>
> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santo...@gmail.com
> wrote:
>
>> That said the in the DefaultRollingPolicy it seems the check is on the
>> file size ( mimics the check shouldRollOnEVent()).
>>
>> I guess the question is
>>
>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread
>> ?
>>
>> Are the calls to the other 2 methods shouldRollOnEVent and
>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>
>>
>>
>>
>>
>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Thanks for the quick reply.
>>>
>>> I am confused. If this was a more full featured BucketingSink ,I would
>>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>>> progress file could go into pending phase and on checkpoint the pending
>>> part file would be  finalized. For exactly once any files ( in progress
>>> file ) will have a length of the file  snapshotted to the checkpoint  and
>>> used to truncate the file ( if supported ) or dropped as a part-length file
>>> ( if truncate not supported )  if a resume from a checkpoint was to happen,
>>> to indicate what part of the the finalized file ( finalized when resumed )
>>> was valid . and  I had always assumed ( and there is no doc otherwise )
>>> that shouldRollOnCheckpoint would be similar to the other 2 apart from
>>> the fact it does the roll and finalize step in a single step on a
>>> checkpoint.
>>>
>>>
>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>> when to use RollingSink is not clear at all, even though at the surface it
>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>
>>> Regards.
>>>
>>>
>>>
>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com>
>>> wrote:
>>>
>>>> I think the only rolling policy that can be used is
>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>
>>>> Tim
>>>>
>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>> vishal.santo...@gmail.com wrote:
>>>>
>>>>> Can StreamingFileSink be used instead of 
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>>>>>  even though it looks it could.
>>>>>
>>>>>
>>>>> This code for example
>>>>>
>>>>>
>>>>>         StreamingFileSink
>>>>>                 .forRowFormat(new Path(PATH),
>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>                 .withBucketAssigner(new 
>>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, 
>>>>> String>() {
>>>>>                                        @Override
>>>>>                                        public boolean 
>>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws 
>>>>> IOException {
>>>>>                                            return false;
>>>>>                                        }
>>>>>
>>>>>                                        @Override
>>>>>                                        public boolean 
>>>>> shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>                                                                         
>>>>> KafkaRecord element) throws IOException {
>>>>>                                            return partFileState.getSize() 
>>>>> > 1024 * 1024 * 1024l;
>>>>>                                        }
>>>>>
>>>>>                                        @Override
>>>>>                                        public boolean 
>>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
>>>>> currentTime) throws IOException {
>>>>>                                            return currentTime - 
>>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>                                                    currentTime - 
>>>>> partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>                                        }
>>>>>                                    }
>>>>>                 )
>>>>>                 .build();
>>>>>
>>>>>
>>>>> few things I see and am not sure I follow about the new RollingFileSink  
>>>>> vis a vis BucketingSink
>>>>>
>>>>>
>>>>> 1. I do not ever see the inprogress file go to the pending state, as in 
>>>>> renamed as pending, as was the case in Bucketing Sink.  I would assume 
>>>>> that it would be pending and then
>>>>>
>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>
>>>>>
>>>>> 2. I see dangling inprogress files at the end of the day. I would assume 
>>>>> that the withBucketCheckInterval set to 1 minute by default, the 
>>>>> shouldRollOnProcessingTime should kick in ?
>>>>>
>>>>>  3. The inprogress files are  like 
>>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that 
>>>>> additional suffix ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I have the following set up on the env
>>>>>
>>>>> env.enableCheckpointing(10 * 60000);
>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>> env.setRestartStrategy(fixedDelayRestart(4, 
>>>>> org.apache.flink.api.common.time.Time.minutes(1)));
>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>> env.setStateBackend(stateBackEnd);
>>>>>
>>>>>
>>>>> Regards.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to