Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6130#discussion_r193637758
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
    @@ -1093,6 +1178,59 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
                return readFile(inputFormat, filePath, watchType, interval, 
typeInformation);
        }
     
    +   /**
    +    * Reads the contents of the user-specified {@code filePath} based on 
the given {@link FileInputFormat}. Depending
    +    * on the provided {@link FileProcessingMode}, the source may 
periodically monitor (every {@code interval} ms) the path
    +    * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), 
process once or {@code numTimes} times the data currently in
    +    * the path and exit ({@link FileProcessingMode#PROCESS_ONCE} or {@link 
FileProcessingMode#PROCESS_N_TIMES}).
    +    * In addition, if the path contains files not to be processed, the 
user can specify a custom {@link FilePathFilter}.
    +    * As a default implementation you can use {@link 
FilePathFilter#createDefaultFilter()}.
    +    *
    +    * <p>Since all data streams need specific information about their 
types, this method needs to determine the
    +    * type of the data produced by the input format. It will attempt to 
determine the data type by reflection,
    +    * unless the input format implements the {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
    +    * In the latter case, this method will invoke the
    +    * {@link 
org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} 
method to determine data
    +    * type produced by the input format.
    +    *
    +    * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set 
to {@link FileProcessingMode#PROCESS_ONCE} or {@link 
FileProcessingMode#PROCESS_N_TIMES} ,
    +    * the source monitors the path <b>once</b>, creates the {@link 
org.apache.flink.core.fs.FileInputSplit FileInputSplits}
    +    * to be processed, forwards them to the downstream {@link 
ContinuousFileReaderOperator readers} to read the actual data,
    +    * and exits, without waiting for the readers to finish reading. This 
implies that no more checkpoint barriers
    +    * are going to be forwarded after the source exits, thus having no 
checkpoints after that point.
    +    *
    +    * @param inputFormat
    +    *              The input format used to create the data stream
    +    * @param filePath
    +    *              The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path")
    +    * @param watchType
    +    *              The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
    +    * @param interval
    +    *              In the case of periodic path monitoring, this specifies 
the interval (in millis) between consecutive path scans
    +    * @param <OUT>
    +    *              The type of the returned data stream
    +    * @param numTimes
    +    *              The number of times to read the file
    +    * @return The data stream that represents the data read from the given 
file
    +    */
    +   @PublicEvolving
    +   public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> 
inputFormat,
    +                                                                           
                String filePath,
    +                                                                           
                FileProcessingMode watchType,
    +                                                                           
                long interval,
    +                                                                           
                int numTimes) {
    +
    +           TypeInformation<OUT> typeInformation;
    +           try {
    +                   typeInformation = 
TypeExtractor.getInputFormatTypes(inputFormat);
    +           } catch (Exception e) {
    +                   throw new InvalidProgramException("The type returned by 
the input format could not be " +
    +                           "automatically determined. Please specify the 
TypeInformation of the produced type " +
    +                           "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
    --- End diff --
    
    Maybe it better to not swallow the Exception here.


---

Reply via email to