Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193638176
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
    @@ -1161,7 +1301,53 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
                Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");
     
                inputFormat.setFilePath(filePath);
    -           return createFileInput(inputFormat, typeInformation, "Custom 
File Source", watchType, interval);
    +           return createFileInput(inputFormat, typeInformation, "Custom 
File Source", watchType, interval, 1);
    +   }
    +
    +   /**
    +    * Reads the contents of the user-specified {@code filePath} based on 
the given {@link FileInputFormat}.
    +    * Depending on the provided {@link FileProcessingMode}, the source may 
periodically monitor (every {@code interval} ms)
    +    * the path for new data ({@link 
FileProcessingMode#PROCESS_CONTINUOUSLY}), process once or {@code numTimes} 
times the data currently in
    +    * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link 
FileProcessingMode#PROCESS_N_TIMES}).
    +    * In addition, if the path contains files not to be processed, the 
user can specify a custom {@link FilePathFilter}.
    +    * As a default implementation you can use {@link 
FilePathFilter#createDefaultFilter()}.
    +    *
    +    * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set 
to {@link FileProcessingMode#PROCESS_ONCE} or {@link 
FileProcessingMode#PROCESS_N_TIMES},
    +    * the source monitors the path <b>once</b>, creates the {@link 
org.apache.flink.core.fs.FileInputSplit FileInputSplits}
    +    * to be processed, forwards them to the downstream {@link 
ContinuousFileReaderOperator readers} to read the actual data,
    +    * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint barriers
    +    * are going to be forwarded after the source exits, thus having no 
checkpoints after that point.
    +    *
    +    * @param inputFormat
    +    *              The input format used to create the data stream
    +    * @param filePath
    +    *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
    +    * @param watchType
    +    *              The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
    +    * @param typeInformation
    +    *              Information on the type of the elements in the output 
stream
    +    * @param interval
    +    *              In the case of periodic path monitoring, this specifies 
the interval (in millis) between consecutive path scans
    +    * @param <OUT>
    +    *              The type of the returned data stream
    +    * @param numTimes
    +    *              The number of times to read the file
    +    * @return The data stream that represents the data read from the given 
file
    +    */
    +   @PublicEvolving
    +   public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
    +                                                                           
                String filePath,
    +                                                                           
                FileProcessingMode watchType,
    +                                                                           
                long interval,
    +                                                                           
                TypeInformation<OUT> typeInformation,
    +                                                                           
                int numTimes) {
    --- End diff --
    
    Could numTimes be negative? If not, we could use the `@Nonnegative` 
annotation, or check it in this function.


---

Reply via email to