Checkpointing & File stream with

2019-06-17 Thread Sung Gon Yi
Hello,

I work on joining two streams, one is from Kafka and another is from a file 
(small size).
Stream processing works well, but checkpointing is failed with following 
message.
The file only has less than 100 lines and the pipeline related file reading is 
finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Custom File Source (1/1) of job 
d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. 
Aborting checkpoint.
——

Custom File Source is related following codes
——
DataStream specificationFileStream = env.readTextFile(specFile)
——

To perform checkpointing successfully, I write a code of custom source function 
to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon



Re: Checkpointing & File stream with

2019-06-17 Thread Yun Tang
Hi Sung

How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type when 
reading data from HDFS. FileProcessingMode.PROCESS_CONTINUOUSLY would 
periodically monitor the source while default FileProcessingMode.PROCESS_ONCE 
would only process once the data and exit.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources

Best
Yun Tang

From: Sung Gon Yi 
Sent: Tuesday, June 18, 2019 14:13
To: user@flink.apache.org
Subject: Checkpointing & File stream with

Hello,

I work on joining two streams, one is from Kafka and another is from a file 
(small size).
Stream processing works well, but checkpointing is failed with following 
message.
The file only has less than 100 lines and the pipeline related file reading is 
finished with “FINISHED’ o as soon as deployed.

After that, checkpointing is failed with following message:
——
2019-06-17 20:25:13,575 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Custom File Source (1/1) of job 
d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED instead. 
Aborting checkpoint.
——

Custom File Source is related following codes
——

DataStream specificationFileStream = env.readTextFile(specFile)

——

To perform checkpointing successfully, I write a code of custom source function 
to keep working (almost sleep after reading a file). I wonder it is correct way.

Sincerely,
Sung Gon



Re: Checkpointing & File stream with

2019-06-18 Thread Sung Gon Yi
It works well now with following codes:
——
TextInputFormat specFileFormat = new TextInputFormat(new Path(specFile));
specFileFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
DataStream specificationFileStream = env
.readFile(specFileFormat, specFile, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 100L, BasicTypeInfo.STRING_TYPE_INFO)
——

Thanks.

> On 18 Jun 2019, at 3:38 PM, Yun Tang  wrote:
> 
> Hi Sung
> 
> How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type 
> when reading data from HDFS.FileProcessingMode.PROCESS_CONTINUOUSLY would 
> periodically monitor the source while default FileProcessingMode.PROCESS_ONCE 
> would only process once the data and exit.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources>
> 
> Best
> Yun Tang
> From: Sung Gon Yi 
> Sent: Tuesday, June 18, 2019 14:13
> To: user@flink.apache.org
> Subject: Checkpointing & File stream with
>  
> Hello,
> 
> I work on joining two streams, one is from Kafka and another is from a file 
> (small size).
> Stream processing works well, but checkpointing is failed with following 
> message.
> The file only has less than 100 lines and the pipeline related file reading 
> is finished with “FINISHED’ o as soon as deployed.
> 
> After that, checkpointing is failed with following message:
> ——
> 2019-06-17 20:25:13,575 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Custom File Source (1/1) of job 
> d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED 
> instead. Aborting checkpoint.
> ——
> 
> Custom File Source is related following codes
> ——
> DataStream specificationFileStream = env.readTextFile(specFile)
> ——
> 
> To perform checkpointing successfully, I write a code of custom source 
> function to keep working (almost sleep after reading a file). I wonder it is 
> correct way.
> 
> Sincerely,
> Sung Gon