Awesome, thanks! Will open a new thread. But yes the inprogress file was helpful.
On Thu, Feb 14, 2019, 7:50 AM Kostas Kloudas <k.klou...@ververica.com wrote: > Hi Vishal, > > For the StreamingFileSink vs Rolling/BucketingSink: > - you can use the StreamingFileSink instead of the Rolling/BucketingSink. > You can see the StreamingFileSink as an evolution of the previous two. > > In the StreamingFileSink the files in Pending state are not renamed, but > they keep their "*in-progress*" name. This is the reason why you do not see > .pending files anymore. > > What Timothy said for bulk formats is correct. They only support > "onCheckpoint" rolling policy. > > Now for the second issue about deployment, I would recommend to open a new > thread so that people can see from the title if they can help or not. > In addition, it is good to have the title indicating the content of the > topic for the community. The mailing list is searchable by search engines, > so if someone > has a similar question, the title will help to retrieve the relevant > thread. > > Cheers, > Kostas > > > On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi < > vishal.santo...@gmail.com> wrote: > >> Thanks Fabian, >> >> more questions >> >> 1. I had on k8s standlone job >> env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the >> default. The job failed on chkpoint and I would have imagined that under HA >> the job would restore from the last checkpoint but it did not ( The UI >> showed the job had restarted without a restore . The state was wiped out >> and the job was relaunched but with no state. >> >> 2. I had the inprogress files from that failed instance and that is >> consistent with no restored state. >> >> Thus there are few questions >> >> 1. In k8s and with stand alone job cluster, have we tested the scenerio >> of the* container failing* ( the pod remained in tact ) and restore ? >> In this case the pod remained up and running but it was definitely a clean >> relaunch of the container the pod was executing. >> >> >> 2. Did I have any configuration missing . given the below ? >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.enableCheckpointing(30 * 60000); >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> env.getCheckpointConfig().setFailOnCheckpointingErrors(false); >> env.setRestartStrategy(fixedDelayRestart(4, >> org.apache.flink.api.common.time.Time.minutes(1))); >> StateBackend stateBackEnd = new FsStateBackend( >> new org.apache.flink.core.fs.Path( >> "........")); >> env.setStateBackend(stateBackEnd); >> >> >> 3. What is the nature of RollingFileSink ? How does it enable exactly >> once semantics ( or does it not . ) ? >> >> Any help will be appreciated. >> >> Regards. >> >> >> >> >> >> >> >> >> >> On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Vishal, >>> >>> Kostas (in CC) should be able to help here. >>> >>> Best, Fabian >>> >>> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi < >>> vishal.santo...@gmail.com>: >>> >>>> Any one ? >>>> >>>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> You don't have to. Thank you for the input. >>>>> >>>>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com> >>>>> wrote: >>>>> >>>>>> My apologies for not seeing your use case properly. The constraint >>>>>> on rolling policy is only applicable for bulk formats such as Parquet as >>>>>> highlighted in the docs. >>>>>> >>>>>> As for your questions, I'll have to defer to others more familiar >>>>>> with it. I mostly just use bulk formats such as avro and parquet. >>>>>> >>>>>> Tim >>>>>> >>>>>> >>>>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi < >>>>>> vishal.santo...@gmail.com wrote: >>>>>> >>>>>>> That said the in the DefaultRollingPolicy it seems the check is on >>>>>>> the file size ( mimics the check shouldRollOnEVent()). >>>>>>> >>>>>>> I guess the question is >>>>>>> >>>>>>> Is the call to shouldRollOnCheckPoint. done by the checkpointing >>>>>>> thread ? >>>>>>> >>>>>>> Are the calls to the other 2 methods shouldRollOnEVent and >>>>>>> shouldRollOnProcessingTIme done on the execution thread as in inlined ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks for the quick reply. >>>>>>>> >>>>>>>> I am confused. If this was a more full featured BucketingSink ,I >>>>>>>> would imagine that based on shouldRollOnEvent and shouldRollOnEvent, >>>>>>>> an in progress file could go into pending phase and on checkpoint >>>>>>>> the pending part file would be finalized. For exactly once any files >>>>>>>> ( in >>>>>>>> progress file ) will have a length of the file snapshotted to the >>>>>>>> checkpoint and used to truncate the file ( if supported ) or dropped >>>>>>>> as a >>>>>>>> part-length file ( if truncate not supported ) if a resume from a >>>>>>>> checkpoint was to happen, to indicate what part of the the finalized >>>>>>>> file ( >>>>>>>> finalized when resumed ) was valid . and I had always assumed ( and >>>>>>>> there >>>>>>>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar >>>>>>>> to the other 2 apart from the fact it does the roll and finalize step >>>>>>>> in a >>>>>>>> single step on a checkpoint. >>>>>>>> >>>>>>>> >>>>>>>> Am I better off using BucketingSink ? When to use BucketingSink >>>>>>>> and when to use RollingSink is not clear at all, even though at the >>>>>>>> surface >>>>>>>> it sure looks RollingSink is a better version of .BucketingSink ( or >>>>>>>> not ) >>>>>>>> >>>>>>>> Regards. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I think the only rolling policy that can be used is >>>>>>>>> CheckpointRollingPolicy to ensure exactly once. >>>>>>>>> >>>>>>>>> Tim >>>>>>>>> >>>>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com wrote: >>>>>>>>> >>>>>>>>>> Can StreamingFileSink be used instead of >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, >>>>>>>>>> even though it looks it could. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This code for example >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> StreamingFileSink >>>>>>>>>> .forRowFormat(new Path(PATH), >>>>>>>>>> new SimpleStringEncoder<KafkaRecord>()) >>>>>>>>>> .withBucketAssigner(new >>>>>>>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) >>>>>>>>>> .withRollingPolicy(new RollingPolicy<KafkaRecord, >>>>>>>>>> String>() { >>>>>>>>>> @Override >>>>>>>>>> public boolean >>>>>>>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws >>>>>>>>>> IOException { >>>>>>>>>> return false; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public boolean >>>>>>>>>> shouldRollOnEvent(PartFileInfo<String> partFileState, >>>>>>>>>> >>>>>>>>>> KafkaRecord element) throws IOException { >>>>>>>>>> return >>>>>>>>>> partFileState.getSize() > 1024 * 1024 * 1024l; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public boolean >>>>>>>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long >>>>>>>>>> currentTime) throws IOException { >>>>>>>>>> return currentTime - >>>>>>>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l || >>>>>>>>>> currentTime - >>>>>>>>>> partFileState.getCreationTime() > 120 * 60 * 1000l; >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> ) >>>>>>>>>> .build(); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> few things I see and am not sure I follow about the new >>>>>>>>>> RollingFileSink vis a vis BucketingSink >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. I do not ever see the inprogress file go to the pending state, as >>>>>>>>>> in renamed as pending, as was the case in Bucketing Sink. I would >>>>>>>>>> assume that it would be pending and then >>>>>>>>>> >>>>>>>>>> finalized on checkpoint for exactly once semantics ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2. I see dangling inprogress files at the end of the day. I would >>>>>>>>>> assume that the withBucketCheckInterval set to 1 minute by default, >>>>>>>>>> the shouldRollOnProcessingTime should kick in ? >>>>>>>>>> >>>>>>>>>> 3. The inprogress files are like >>>>>>>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is >>>>>>>>>> that additional suffix ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I have the following set up on the env >>>>>>>>>> >>>>>>>>>> env.enableCheckpointing(10 * 60000); >>>>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>>>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, >>>>>>>>>> org.apache.flink.api.common.time.Time.minutes(1))); >>>>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend(); >>>>>>>>>> env.setStateBackend(stateBackEnd); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> > > -- > > Kostas Kloudas | Software Engineer > > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >