infoverload commented on a change in pull request #18288:
URL: https://github.com/apache/flink/pull/18288#discussion_r786116646



##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.

Review comment:
       ```suggestion
   connector provides the same guarantees for both `BATCH` and `STREAMING` and 
is designed to provide exactly-once semantics for `STREAMING` execution.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems

Review comment:
       ```suggestion
   This connector provides a unified Source and Sink for `BATCH` and 
`STREAMING` that reads or writes (partitioned) files to file systems
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.

Review comment:
       ```suggestion
   with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g. Avro, CSV, Parquet) and
   produces a stream or records.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.

Review comment:
       ```suggestion
   A bounded `File Source` lists all files (via SplitEnumerator - a recursive 
directory list with filtered-out hidden files) and reads them all.
   
   An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
   In this case, the `SplitEnumerator` will enumerate like the bounded case 
but, after a certain interval, repeats the enumeration.
   For any repeated enumeration, the `SplitEnumerator` filters out previously 
detected files and only sends new ones to the `SourceReader`.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:

Review comment:
       ```suggestion
   You can start building a File Source via one of the following API calls:
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are 
multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and 
flexibility/efficiency.

Review comment:
       ```suggestion
   The interfaces are a tradeoff between simplicity of implementation and 
flexibility/efficiency.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are 
multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and 
flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the 
simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is 
limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest 
flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream 
using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, 
it will re-read
+and discard the number of lined that were processed before the last 
checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the 
charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends 
SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream 
stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), 
path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be 
initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, 
Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on 
the fields of the `SomePojo` class using the `Jackson` library. (Note: you 
might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to 
your class definition with the fields order exactly matching those of the CSV 
file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing 
options, use the more low-level `forSchema` static factory method of 
`CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of 
bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory 
for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is 
created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk 
reader is
+created based on a checkpoint during checkpointed streaming execution, then 
the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` 
method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a 
`StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits 
based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. 
The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would 
be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int 
minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, 
because watermarks eagerly advance within a file, and the next file might 
contain data later than the watermark again.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all 
already processed files, which is a state that can in some cases grow rather 
large.
+The future will be planned to add a compressed form of tracking already 
processed files in the future (for example by keeping modification timestamps 
lower boundaries).

Review comment:
       ```suggestion
   For Unbounded File Sources, the enumerator currently remembers paths of all 
already processed files, which is a state that can, in some cases, grow rather 
large.
   There are plans to add a compressed form of tracking already processed files 
in the future (for example, by keeping modification timestamps below 
boundaries).
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.

Review comment:
       ```suggestion
   * File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File `SourceReader`.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.

Review comment:
       ```suggestion
   You will need to combine the `File Source` with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}), which allows you to parse 
CSV, decode AVRO, or read Parquet columnar files.
   ```

##########
File path: docs/content/docs/deployment/filesystems/s3.md
##########
@@ -70,16 +70,15 @@ Both implementations are self-contained with no dependency 
footprint, so there i
   
      For example, Hadoop has a `fs.s3a.connection.maximum` configuration key. 
If you want to change it, you need to put `s3.connection.maximum: xyz` to the 
`flink-conf.yaml`. Flink will internally translate this back to 
`fs.s3a.connection.maximum`. There is no need to pass configuration parameters 
using Hadoop's XML configuration files.
   
-    It is the only S3 file system with support for the [StreamingFileSink]({{< 
ref "docs/connectors/datastream/streamfile_sink" >}}) and the [FileSink]({{< 
ref "docs/connectors/datastream/file_sink" >}}).
+    It is the only S3 file system with support for the [FileSystem]({{< ref 
"docs/connectors/datastream/filesystem" >}}).
   
 
 Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem
 wrappers for URIs with the *s3://* scheme, `flink-s3-fs-hadoop` also registers
 for *s3a://* and `flink-s3-fs-presto` also registers for *s3p://*, so you can
 use this to use both at the same time.
-For example, the job uses the [StreamingFileSink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which only supports Hadoop, 
but uses Presto for checkpointing.
-In this case, it is advised to explicitly use *s3a://* as a scheme for the 
sink (Hadoop) and *s3p://* for checkpointing (Presto). The same holds for the 
-[FileSink]({{< ref "docs/connectors/datastream/file_sink" >}}).
+For example, the job uses the [FileSystem]({{< ref 
"docs/connectors/datastream/filesystem" >}}) which only supports Hadoop, but 
uses Presto for checkpointing.
+In this case, it is advised to explicitly use *s3a://* as a scheme for the 
sink (Hadoop) and *s3p://* for checkpointing (Presto).

Review comment:
       ```suggestion
   In this case, you should explicitly use *s3a://* as a scheme for the sink 
(Hadoop) and *s3p://* for checkpointing (Presto).
   ```

##########
File path: docs/content/docs/connectors/table/filesystem.md
##########
@@ -161,8 +161,8 @@ CREATE TABLE MyUserTableWithFilepath (
 
 ## Streaming Sink
 
-The file system connector supports streaming writes, based on Flink's 
[Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}})
-to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are parquet, orc and avro.
+The file system connector supports streaming writes, based on Flink's 
[FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}})
+to write records to file. Row-encoded Formats are csv and json. Bulk-encoded 
Formats are Parquet, ORC and Avro.

Review comment:
       ```suggestion
   The file system connector supports streaming writes, based on Flink's 
[FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}),
   to write records to file. Row-encoded Formats are CSV and JSON. Bulk-encoded 
Formats are Parquet, ORC and Avro.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.

Review comment:
       ```suggestion
   This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the File Source.
   
   For the bounded/batch case, the File Source processes all files under the 
given path(s). 
   For the continuous/streaming case, the source periodically checks the paths 
for new files and will start reading those.
   
   When you start creating a File Source (via the 
`FileSource.FileSourceBuilder` created through one of the above-mentioned 
methods), 
   the source is in bounded/batch mode by default. You can call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
   to put the source into continuous streaming mode.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are 
multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and 
flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the 
simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is 
limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest 
flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream 
using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, 
it will re-read
+and discard the number of lined that were processed before the last 
checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the 
charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.

Review comment:
       ```suggestion
   This is a simple version of `StreamFormat` for formats that are not 
splittable.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 

Review comment:
       ```suggestion
   It is divided into the following two parts: File `SplitEnumerator` and File 
`SourceReader`. 
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are 
multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and 
flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the 
simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is 
limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest 
flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream 
using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, 
it will re-read
+and discard the number of lined that were processed before the last 
checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the 
charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends 
SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream 
stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), 
path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be 
initialized as follows:

Review comment:
       ```suggestion
   An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be 
initialized like this:
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are 
multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and 
flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the 
simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is 
limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest 
flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream 
using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, 
it will re-read
+and discard the number of lined that were processed before the last 
checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the 
charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends 
SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream 
stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), 
path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be 
initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, 
Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on 
the fields of the `SomePojo` class using the `Jackson` library. (Note: you 
might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to 
your class definition with the fields order exactly matching those of the CSV 
file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing 
options, use the more low-level `forSchema` static factory method of 
`CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of 
bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory 
for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is 
created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk 
reader is
+created based on a checkpoint during checkpointed streaming execution, then 
the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` 
method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a 
`StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits 
based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. 
The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would 
be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int 
minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, 
because watermarks eagerly advance within a file, and the next file might 
contain data later than the watermark again.
+

Review comment:
       ```suggestion
   Watermarking does not work very well for large backlogs of files. This is 
because watermarks eagerly advance within a file, and the next file might 
contain data later than the watermark.
   
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are 
multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and 
flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the 
simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is 
limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest 
flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream 
using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, 
it will re-read
+and discard the number of lined that were processed before the last 
checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the 
charset decoders
+with their internal buffering of stream input and charset decoder state.

Review comment:
       ```suggestion
   #### TextLine Format
   
   A `StreamFormat` reader formats text lines from a file.
   The reader uses Java's built-in `InputStreamReader` to decode the byte 
stream using
   various supported charset encodings.
   This format does not support optimized recovery from checkpoints. On 
recovery, it will re-read
   and discard the number of lines that were processed before the last 
checkpoint. This is due to
   the fact that the offsets of lines in the file cannot be tracked through the 
charset decoders
   with their internal buffering of stream input and charset decoder state.
   ```

##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -25,12 +27,227 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# File Sink
+# FileSystem
 
-This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+This connector provides a unified Source and Sink for `BATCH` and `STREAMING` 
that reads or writes (partitioned) files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{< ref 
"docs/deployment/filesystems/overview" >}}). This filesystem
-connector provides the same guarantees for both `BATCH` and `STREAMING` and it 
is an evolution of the 
-existing [Streaming File Sink]({{< ref 
"docs/connectors/datastream/streamfile_sink" >}}) which was designed for 
providing exactly-once semantics for `STREAMING` execution.
+connector provides the same guarantees for both `BATCH` and `STREAMING` and is 
designed for providing exactly-once semantics for `STREAMING` execution.
+
+The connector supports reading and writing a set of files from any 
(distributed) file system (e.g. POSIX, S3, HDFS)
+with a [format]({{< ref "docs/connectors/datastream/formats/overview" >}}) 
(e.g., Avro, CSV, Parquet),
+producing a stream or records.
+
+## File Source
+
+The `File Source` is based on the [Source API]({{< ref 
"docs/dev/datastream/sources" >}}#the-data-source-api), 
+a unified data source that reads files - both in batch and in streaming mode. 
+It is divided into the following two parts: File SplitEnumerator and File 
SourceReader. 
+
+* File `SplitEnumerator` is responsible for discovering and identifying the 
files to read and assigns them to the File SourceReader.
+* File `SourceReader` requests the files it needs to process and reads the 
file from the filesystem. 
+
+You'll need to combine the File Source with a [format]({{< ref 
"docs/connectors/datastream/formats/overview" >}}). This allows you to
+parse CSV, decode AVRO or read Parquet columnar files.
+
+#### Bounded and Unbounded Streams
+
+A bounded `File Source` lists all files (via SplitEnumerator, for example a 
recursive directory list with filtered-out hidden files) and reads them all.
+
+An unbounded `File Source` is created when configuring the enumerator for 
periodic file discovery.
+In that case, the SplitEnumerator will enumerate like the bounded case but 
after a certain interval repeats the enumeration.
+For any repeated enumeration, the `SplitEnumerator` filters out previously 
detect files and only sends new ones to the `SourceReader`.
+
+### Usage
+
+You start building a file source via one of the following calls:
+
+{{< tabs "FileSourceUsage" >}}
+{{< tab "Java" >}}
+```java
+// reads the contents of a file from a file stream. 
+FileSource.forRecordStreamFormat(StreamFormat,Path...)
+        
+// reads batches of records from a file at a time
+FileSource.forBulkFileFormat(BulkFormat,Path...)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This creates a `FileSource.FileSourceBuilder` on which you can configure all 
the properties of the file source.
+
+For the bounded/batch case, the file source processes all files under the 
given path(s). 
+In the continuous/streaming case, the source periodically checks the paths for 
new files and will start reading those.
+
+When you start creating a file source (via the `FileSource.FileSourceBuilder` 
created through one of the above-mentioned methods) 
+the source is by default in bounded/batch mode. Call 
`AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)` 
+to put the source into continuous streaming mode.
+
+{{< tabs "FileSourceBuilder" >}}
+{{< tab "Java" >}}
+```java
+final FileSource<String> source =
+        FileSource.forRecordStreamFormat(...)
+        .monitorContinuously(Duration.ofMillis(5))  
+        .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Format Types
+
+The reading of each file happens through file readers defined by file formats. 
+These define the parsing logic for the contents of the file. There are 
multiple classes that the source supports. 
+Their interfaces trade of simplicity of implementation and 
flexibility/efficiency.
+
+* A `StreamFormat` reads the contents of a file from a file stream. It is the 
simplest format to implement, 
+and provides many features out-of-the-box (like checkpointing logic) but is 
limited in the optimizations it can apply 
+(such as object reuse, batching, etc.).
+
+* A `BulkFormat` reads batches of records from a file at a time. 
+It is the most "low level" format to implement, but offers the greatest 
flexibility to optimize the implementation.
+
+#### TextLine format
+
+A `StreamFormat` reader format that text lines from a file.
+The reader uses Java's built-in `InputStreamReader` to decode the byte stream 
using
+various supported charset encodings.
+This format does not support optimized recovery from checkpoints. On recovery, 
it will re-read
+and discard the number of lined that were processed before the last 
checkpoint. That is due to
+the fact that the offsets of lines in the file cannot be tracked through the 
charset decoders
+with their internal buffering of stream input and charset decoder state.
+
+#### SimpleStreamFormat Abstract Class
+
+A simple version of `StreamFormat` for formats that are not splittable.
+Custom reads of Array or File can be done by implementing `SimpleStreamFormat`:
+
+{{< tabs "SimpleStreamFormat" >}}
+{{< tab "Java" >}}
+```java
+private static final class ArrayReaderFormat extends 
SimpleStreamFormat<byte[]> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public Reader<byte[]> createReader(Configuration config, FSDataInputStream 
stream)
+            throws IOException {
+        return new ArrayReader(stream);
+    }
+
+    @Override
+    public TypeInformation<byte[]> getProducedType() {
+        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+    }
+}
+
+final FileSource<byte[]> source =
+                FileSource.forRecordStreamFormat(new ArrayReaderFormat(), 
path).build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+An example of a `SimpleStreamFormat` is `CsvReaderFormat`. It can be 
initialized as follows:
+```java
+CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
+FileSource<SomePojo> source = 
+        FileSource.forRecordStreamFormat(csvFormat, 
Path.fromLocalFile(...)).build();
+```
+
+The schema for CSV parsing, in this case, is automatically derived based on 
the fields of the `SomePojo` class using the `Jackson` library. (Note: you 
might need to add `@JsonPropertyOrder({field1, field2, ...})` annotation to 
your class definition with the fields order exactly matching those of the CSV 
file columns).
+
+If you need more fine-grained control over the CSV schema or the parsing 
options, use the more low-level `forSchema` static factory method of 
`CsvReaderFormat`:
+
+```java
+CsvReaderFormat<T> forSchema(CsvMapper mapper, 
+                             CsvSchema schema, 
+                             TypeInformation<T> typeInformation) 
+```
+
+#### Bulk Format
+
+The BulkFormat reads and decodes batches of records at a time. Examples of 
bulk formats
+are formats like ORC or Parquet.
+The outer `BulkFormat` class acts mainly as a configuration holder and factory 
for the
+reader. The actual reading is done by the `BulkFormat.Reader`, which is 
created in the
+`BulkFormat#createReader(Configuration, FileSourceSplit)` method. If a bulk 
reader is
+created based on a checkpoint during checkpointed streaming execution, then 
the reader is
+re-created in the `BulkFormat#restoreReader(Configuration, FileSourceSplit)` 
method.
+
+A `SimpleStreamFormat` can be turned into a `BulkFormat` by wrapping it in a 
`StreamFormatAdapter`:
+```java
+BulkFormat<SomePojo, FileSourceSplit> bulkFormat = 
+        new StreamFormatAdapter<>(CsvReaderFormat.forPojo(SomePojo.class));
+```
+
+### Customizing File Enumeration
+
+{{< tabs "CustomizingFileEnumeration" >}}
+{{< tab "Java" >}}
+```java
+/**
+ * A FileEnumerator implementation for hive source, which generates splits 
based on 
+ * HiveTablePartition.
+ */
+public class HiveSourceFileEnumerator implements FileEnumerator {
+    
+    // reference constructor
+    public HiveSourceFileEnumerator(...) {
+        ...
+    }
+
+    /***
+     * Generates all file splits for the relevant files under the given paths. 
The {@code
+     * minDesiredSplits} is an optional hint indicating how many splits would 
be necessary to
+     * exploit parallelism properly.
+     */
+    @Override
+    public Collection<FileSourceSplit> enumerateSplits(Path[] paths, int 
minDesiredSplits)
+            throws IOException {
+        // createInputSplits:splitting files into fragmented collections
+        return new ArrayList<>(createInputSplits(...));
+    }
+
+    ...
+
+    /***
+     * A factory to create HiveSourceFileEnumerator.
+     */
+    public static class Provider implements FileEnumerator.Provider {
+
+        ...
+        @Override
+        public FileEnumerator create() {
+            return new HiveSourceFileEnumerator(...);
+        }
+    }
+}
+// use the customizing file enumeration
+new HiveSource<>(
+        ...,
+        new HiveSourceFileEnumerator.Provider(
+        partitions != null ? partitions : Collections.emptyList(),
+        new JobConfWrapper(jobConf)),
+       ...);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Current Limitations
+
+Watermarking doesn't work particularly well for large backlogs of files, 
because watermarks eagerly advance within a file, and the next file might 
contain data later than the watermark again.
+
+For Unbounded File Sources, the enumerator currently remembers paths of all 
already processed files, which is a state that can in some cases grow rather 
large.
+The future will be planned to add a compressed form of tracking already 
processed files in the future (for example by keeping modification timestamps 
lower boundaries).
+
+### Behind the Scene
+{{< hint info >}}
+If you are interested in how File source works under the design of new data 
source API, you may
+want to read this part as a reference. For details about the new data source 
API,
+[documentation of data source]({{< ref "docs/dev/datastream/sources.md" >}}) 
and
+<a 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface";>FLIP-27</a>
+provide more descriptive discussions.
+{{< /hint >}}

Review comment:
       ```suggestion
   ### Behind the Scenes
   {{< hint info >}}
   If you are interested in how File Source works through the new data source 
API design, you may
   want to read this part as a reference. For details about the new data source 
API, check out the
   [documentation on data sources]({{< ref "docs/dev/datastream/sources.md" 
>}}) and
   <a 
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface";>FLIP-27</a>
   for more descriptive discussions.
   {{< /hint >}}
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to