Awesome, thanks!  Will open a new thread. But yes the inprogress file was
helpful.

On Thu, Feb 14, 2019, 7:50 AM Kostas Kloudas <k.klou...@ververica.com wrote:

> Hi Vishal,
>
> For the StreamingFileSink vs Rolling/BucketingSink:
>  - you can use the StreamingFileSink instead of the Rolling/BucketingSink.
> You can see the StreamingFileSink as an evolution of the previous two.
>
> In the StreamingFileSink the files in Pending state are not renamed, but
> they keep their "*in-progress*" name. This is the reason why you do not see
> .pending files anymore.
>
> What Timothy said for bulk formats is correct. They only support
> "onCheckpoint" rolling policy.
>
> Now for the second issue about deployment, I would recommend to open a new
> thread so that people can see from the title if they can help or not.
> In addition, it is good to have the title indicating the content of the
> topic for the community. The mailing list is searchable by search engines,
> so if someone
> has a similar question, the title will help to retrieve the relevant
> thread.
>
> Cheers,
> Kostas
>
>
> On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>>  more questions
>>
>> 1. I had on k8s standlone job
>> env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the
>> default. The job failed on chkpoint and I would have imagined that under HA
>> the job would restore from the last checkpoint but it did not ( The UI
>> showed the job had restarted without a restore . The state was wiped out
>> and the job was relaunched but with no state.
>>
>> 2. I had the inprogress files from that failed instance and that is
>> consistent with no restored state.
>>
>> Thus there are few  questions
>>
>> 1. In k8s and with stand alone job cluster, have we tested the scenerio
>> of the* container failing* ( the pod remained in tact ) and restore ?
>> In this case the pod remained up and running but it was definitely a clean
>> relaunch of the container the pod was executing.
>>
>>
>> 2. Did I have any configuration missing . given the below  ?
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(30 * 60000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
>> env.setRestartStrategy(fixedDelayRestart(4, 
>> org.apache.flink.api.common.time.Time.minutes(1)));
>> StateBackend stateBackEnd = new FsStateBackend(
>>         new org.apache.flink.core.fs.Path(
>>                 "........"));
>> env.setStateBackend(stateBackEnd);
>>
>>
>> 3. What is the nature of RollingFileSink ?  How does it enable exactly
>> once semantics ( or does it not . ) ?
>>
>> Any help will be appreciated.
>>
>> Regards.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> Kostas (in CC) should be able to help here.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
>>> vishal.santo...@gmail.com>:
>>>
>>>> Any one ?
>>>>
>>>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> You don't have to. Thank you for the input.
>>>>>
>>>>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> My apologies for not seeing your use case properly.   The constraint
>>>>>> on rolling policy is only applicable for bulk formats such as Parquet as
>>>>>> highlighted in the docs.
>>>>>>
>>>>>> As for your questions, I'll have to defer to others more familiar
>>>>>> with it.   I mostly just use bulk formats such as avro and parquet.
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>>
>>>>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com wrote:
>>>>>>
>>>>>>> That said the in the DefaultRollingPolicy it seems the check is on
>>>>>>> the file size ( mimics the check shouldRollOnEVent()).
>>>>>>>
>>>>>>> I guess the question is
>>>>>>>
>>>>>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>>>>>> thread ?
>>>>>>>
>>>>>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>>>>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks for the quick reply.
>>>>>>>>
>>>>>>>> I am confused. If this was a more full featured BucketingSink ,I
>>>>>>>> would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
>>>>>>>> an in progress file could go into pending phase and on checkpoint
>>>>>>>> the pending part file would be  finalized. For exactly once any files 
>>>>>>>> ( in
>>>>>>>> progress file ) will have a length of the file  snapshotted to the
>>>>>>>> checkpoint  and used to truncate the file ( if supported ) or dropped 
>>>>>>>> as a
>>>>>>>> part-length file ( if truncate not supported )  if a resume from a
>>>>>>>> checkpoint was to happen, to indicate what part of the the finalized 
>>>>>>>> file (
>>>>>>>> finalized when resumed ) was valid . and  I had always assumed ( and 
>>>>>>>> there
>>>>>>>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar
>>>>>>>> to the other 2 apart from the fact it does the roll and finalize step 
>>>>>>>> in a
>>>>>>>> single step on a checkpoint.
>>>>>>>>
>>>>>>>>
>>>>>>>> Am I better off using BucketingSink ?  When to use BucketingSink
>>>>>>>> and when to use RollingSink is not clear at all, even though at the 
>>>>>>>> surface
>>>>>>>> it sure looks RollingSink is a better version of .BucketingSink ( or 
>>>>>>>> not )
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think the only rolling policy that can be used is
>>>>>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>>>>>
>>>>>>>>> Tim
>>>>>>>>>
>>>>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com wrote:
>>>>>>>>>
>>>>>>>>>> Can StreamingFileSink be used instead of 
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
>>>>>>>>>>  even though it looks it could.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This code for example
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>         StreamingFileSink
>>>>>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>>>>>                 .withBucketAssigner(new 
>>>>>>>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, 
>>>>>>>>>> String>() {
>>>>>>>>>>                                        @Override
>>>>>>>>>>                                        public boolean 
>>>>>>>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws 
>>>>>>>>>> IOException {
>>>>>>>>>>                                            return false;
>>>>>>>>>>                                        }
>>>>>>>>>>
>>>>>>>>>>                                        @Override
>>>>>>>>>>                                        public boolean 
>>>>>>>>>> shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>>>>>                                                                      
>>>>>>>>>>    KafkaRecord element) throws IOException {
>>>>>>>>>>                                            return 
>>>>>>>>>> partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>>>>>                                        }
>>>>>>>>>>
>>>>>>>>>>                                        @Override
>>>>>>>>>>                                        public boolean 
>>>>>>>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long 
>>>>>>>>>> currentTime) throws IOException {
>>>>>>>>>>                                            return currentTime - 
>>>>>>>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>>>>>                                                    currentTime - 
>>>>>>>>>> partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>>>>>                                        }
>>>>>>>>>>                                    }
>>>>>>>>>>                 )
>>>>>>>>>>                 .build();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> few things I see and am not sure I follow about the new 
>>>>>>>>>> RollingFileSink  vis a vis BucketingSink
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. I do not ever see the inprogress file go to the pending state, as 
>>>>>>>>>> in renamed as pending, as was the case in Bucketing Sink.  I would 
>>>>>>>>>> assume that it would be pending and then
>>>>>>>>>>
>>>>>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2. I see dangling inprogress files at the end of the day. I would 
>>>>>>>>>> assume that the withBucketCheckInterval set to 1 minute by default, 
>>>>>>>>>> the shouldRollOnProcessingTime should kick in ?
>>>>>>>>>>
>>>>>>>>>>  3. The inprogress files are  like 
>>>>>>>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is 
>>>>>>>>>> that additional suffix ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I have the following set up on the env
>>>>>>>>>>
>>>>>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, 
>>>>>>>>>> org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>
> --
>
> Kostas Kloudas | Software Engineer
>
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Reply via email to