Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6130#discussion_r193637932 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -1161,7 +1301,53 @@ public TimeCharacteristic getStreamTimeCharacteristic() { Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); inputFormat.setFilePath(filePath); - return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval); + return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval, 1); + } + + /** + * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. + * Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) + * the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), process once or {@code numTimes} times the data currently in + * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES}). + * In addition, if the path contains files not to be processed, the user can specify a custom {@link FilePathFilter}. + * As a default implementation you can use {@link FilePathFilter#createDefaultFilter()}. + * + * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES}, + * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} + * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, + * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers + * are going to be forwarded after the source exits, thus having no checkpoints after that point. + * + * @param inputFormat + * The input format used to create the data stream + * @param filePath + * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") + * @param watchType + * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit + * @param typeInformation + * Information on the type of the elements in the output stream + * @param interval + * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans + * @param <OUT> + * The type of the returned data stream + * @param numTimes + * The number of times to read the file + * @return The data stream that represents the data read from the given file + */ + @PublicEvolving + public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, + String filePath, + FileProcessingMode watchType, + long interval, + TypeInformation<OUT> typeInformation, + int numTimes) { + + Preconditions.checkNotNull(inputFormat, "InputFormat must not be null."); + Preconditions.checkNotNull(filePath, "The file path must not be null."); + Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); --- End diff -- Again, looks like should be `Preconditions.checkArgument()`.
---