Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6130#discussion_r193637758 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -1093,6 +1178,59 @@ public TimeCharacteristic getStreamTimeCharacteristic() { return readFile(inputFormat, filePath, watchType, interval, typeInformation); } + /** + * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending + * on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) the path + * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), process once or {@code numTimes} times the data currently in + * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES}). + * In addition, if the path contains files not to be processed, the user can specify a custom {@link FilePathFilter}. + * As a default implementation you can use {@link FilePathFilter#createDefaultFilter()}. + * + * <p>Since all data streams need specific information about their types, this method needs to determine the + * type of the data produced by the input format. It will attempt to determine the data type by reflection, + * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. + * In the latter case, this method will invoke the + * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data + * type produced by the input format. + * + * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE} or {@link FileProcessingMode#PROCESS_N_TIMES} , + * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} + * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data, + * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers + * are going to be forwarded after the source exits, thus having no checkpoints after that point. + * + * @param inputFormat + * The input format used to create the data stream + * @param filePath + * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path") + * @param watchType + * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit + * @param interval + * In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans + * @param <OUT> + * The type of the returned data stream + * @param numTimes + * The number of times to read the file + * @return The data stream that represents the data read from the given file + */ + @PublicEvolving + public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, + String filePath, + FileProcessingMode watchType, + long interval, + int numTimes) { + + TypeInformation<OUT> typeInformation; + try { + typeInformation = TypeExtractor.getInputFormatTypes(inputFormat); + } catch (Exception e) { + throw new InvalidProgramException("The type returned by the input format could not be " + + "automatically determined. Please specify the TypeInformation of the produced type " + + "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); --- End diff -- Maybe it better to not swallow the Exception here.
---