You don't have to. Thank you for the input. On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vict...@gmail.com> wrote:
> My apologies for not seeing your use case properly. The constraint on > rolling policy is only applicable for bulk formats such as Parquet as > highlighted in the docs. > > As for your questions, I'll have to defer to others more familiar with > it. I mostly just use bulk formats such as avro and parquet. > > Tim > > > On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santo...@gmail.com > wrote: > >> That said the in the DefaultRollingPolicy it seems the check is on the >> file size ( mimics the check shouldRollOnEVent()). >> >> I guess the question is >> >> Is the call to shouldRollOnCheckPoint. done by the checkpointing thread >> ? >> >> Are the calls to the other 2 methods shouldRollOnEVent and >> shouldRollOnProcessingTIme done on the execution thread as in inlined ? >> >> >> >> >> >> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Thanks for the quick reply. >>> >>> I am confused. If this was a more full featured BucketingSink ,I would >>> imagine that based on shouldRollOnEvent and shouldRollOnEvent, an in >>> progress file could go into pending phase and on checkpoint the pending >>> part file would be finalized. For exactly once any files ( in progress >>> file ) will have a length of the file snapshotted to the checkpoint and >>> used to truncate the file ( if supported ) or dropped as a part-length file >>> ( if truncate not supported ) if a resume from a checkpoint was to happen, >>> to indicate what part of the the finalized file ( finalized when resumed ) >>> was valid . and I had always assumed ( and there is no doc otherwise ) >>> that shouldRollOnCheckpoint would be similar to the other 2 apart from >>> the fact it does the roll and finalize step in a single step on a >>> checkpoint. >>> >>> >>> Am I better off using BucketingSink ? When to use BucketingSink and >>> when to use RollingSink is not clear at all, even though at the surface it >>> sure looks RollingSink is a better version of .BucketingSink ( or not ) >>> >>> Regards. >>> >>> >>> >>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vict...@gmail.com> >>> wrote: >>> >>>> I think the only rolling policy that can be used is >>>> CheckpointRollingPolicy to ensure exactly once. >>>> >>>> Tim >>>> >>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi < >>>> vishal.santo...@gmail.com wrote: >>>> >>>>> Can StreamingFileSink be used instead of >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, >>>>> even though it looks it could. >>>>> >>>>> >>>>> This code for example >>>>> >>>>> >>>>> StreamingFileSink >>>>> .forRowFormat(new Path(PATH), >>>>> new SimpleStringEncoder<KafkaRecord>()) >>>>> .withBucketAssigner(new >>>>> KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID)) >>>>> .withRollingPolicy(new RollingPolicy<KafkaRecord, >>>>> String>() { >>>>> @Override >>>>> public boolean >>>>> shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws >>>>> IOException { >>>>> return false; >>>>> } >>>>> >>>>> @Override >>>>> public boolean >>>>> shouldRollOnEvent(PartFileInfo<String> partFileState, >>>>> >>>>> KafkaRecord element) throws IOException { >>>>> return partFileState.getSize() >>>>> > 1024 * 1024 * 1024l; >>>>> } >>>>> >>>>> @Override >>>>> public boolean >>>>> shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long >>>>> currentTime) throws IOException { >>>>> return currentTime - >>>>> partFileState.getLastUpdateTime() > 10 * 60 * 1000l || >>>>> currentTime - >>>>> partFileState.getCreationTime() > 120 * 60 * 1000l; >>>>> } >>>>> } >>>>> ) >>>>> .build(); >>>>> >>>>> >>>>> few things I see and am not sure I follow about the new RollingFileSink >>>>> vis a vis BucketingSink >>>>> >>>>> >>>>> 1. I do not ever see the inprogress file go to the pending state, as in >>>>> renamed as pending, as was the case in Bucketing Sink. I would assume >>>>> that it would be pending and then >>>>> >>>>> finalized on checkpoint for exactly once semantics ? >>>>> >>>>> >>>>> 2. I see dangling inprogress files at the end of the day. I would assume >>>>> that the withBucketCheckInterval set to 1 minute by default, the >>>>> shouldRollOnProcessingTime should kick in ? >>>>> >>>>> 3. The inprogress files are like >>>>> .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that >>>>> additional suffix ? >>>>> >>>>> >>>>> >>>>> >>>>> I have the following set up on the env >>>>> >>>>> env.enableCheckpointing(10 * 60000); >>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >>>>> env.setRestartStrategy(fixedDelayRestart(4, >>>>> org.apache.flink.api.common.time.Time.minutes(1))); >>>>> StateBackend stateBackEnd = new MemoryStateBackend(); >>>>> env.setStateBackend(stateBackEnd); >>>>> >>>>> >>>>> Regards. >>>>> >>>>> >>>>> >>>>> >>>>>