Thanks David, in case of manual restart; to get checkpoint path
programmatically I'm using the following code to retrieve JobId and
CheckpointID so i could pass along while restarting with "-s" but seems I'm
missing something as I'm getting empty TimestampedFileSplit array.
GlobFilePathFilter filePathFilter = new GlobFilePathFilter(
Collections.singletonList("[0-9a-fA-F]{32}/chk-[\\d]+"),
Collections.emptyList());
TextInputFormat inputFormat = new TextInputFormat(
new org.apache.flink.core.fs.Path(inputFolderPath));
inputFormat.setNestedFileEnumeration(true);
inputFormat.setFilesFilter(filePathFilter);
ContinuousFileMonitoringFunction monitoringFunction = new
ContinuousFileMonitoringFunction<>(
inputFormat,
FileProcessingMode.PROCESS_CONTINUOUSLY,
inputFolderParallelism,
pollInterval);
DataStream splits =
env.addSource(monitoringFunction);
splits.addSink(new PrintSinkFunction<>());
Arpith
On Fri, Sep 18, 2020 at 2:09 PM David Anderson
wrote:
> If your job crashes, Flink will automatically restart from the latest
> checkpoint, without any manual intervention. JobManager HA is only needed
> for automatic recovery after the failure of the Job Manager.
>
> You only need externalized checkpoints and "-s :checkpointPath" if you
> want to use checkpoints to manually restart a job after manually cancelling
> or stopping it. Also, it's not necessary that you have read access to the
> checkpoints, but the job manager and task managers must be able to read
> (and write) them.
>
> Regards,
> David
>
> On Fri, Sep 18, 2020 at 6:23 AM Arpith P wrote:
>
>> Hi,
>>
>> I'm running Flink job in distributed mode deployed in Yarn; I've enabled
>> externalized checkpoint to save in Hdfs, but I don't have access to read
>> checkpoints folder. To restart Flink job from the last saved checkpoint is
>> it possible to do without passing "-s :checkpointPath". If this is not
>> possible how can I restore states after the job gets crashed. If enabling
>> JobManager HA would help me in anyway.
>>
>> Thanks,
>> Arpith
>>
>