Can StreamingFileSink be used instead of
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
even though it looks it could.


This code for example


        StreamingFileSink
                .forRowFormat(new Path(PATH),
                        new SimpleStringEncoder<KafkaRecord>())
                .withBucketAssigner(new
KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
                .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
                                       @Override
                                       public boolean
shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws
IOException {
                                           return false;
                                       }

                                       @Override
                                       public boolean
shouldRollOnEvent(PartFileInfo<String> partFileState,

 KafkaRecord element) throws IOException {
                                           return
partFileState.getSize() > 1024 * 1024 * 1024l;
                                       }

                                       @Override
                                       public boolean
shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long
currentTime) throws IOException {
                                           return currentTime -
partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
                                                   currentTime -
partFileState.getCreationTime() > 120 * 60 * 1000l;
                                       }
                                   }
                )
                .build();


few things I see and am not sure I follow about the new
RollingFileSink  vis a vis BucketingSink


1. I do not ever see the inprogress file go to the pending state, as
in renamed as pending, as was the case in Bucketing Sink.  I would
assume that it would be pending and then

   finalized on checkpoint for exactly once semantics ?


2. I see dangling inprogress files at the end of the day. I would
assume that the withBucketCheckInterval set to 1 minute by default,
the shouldRollOnProcessingTime should kick in ?

 3. The inprogress files are  like
.part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is
that additional suffix ?




I have the following set up on the env

env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4,
org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);


Regards.

Reply via email to