alpinegizmo commented on a change in pull request #14061:
URL: https://github.com/apache/flink/pull/14061#discussion_r524078321



##########
File path: docs/dev/connectors/file_sink.md
##########
@@ -0,0 +1,808 @@
+---
+title: "File Sink"
+nav-title: File Sink
+nav-parent_id: connectors
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html). It provides the
+same guarantees for both `BATCH` and `STREAMING` execution by exposing the 
same API while having different runtime 
+implementations for each mode.
+
+The file sink writes incoming data into buckets. Given that the incoming 
streams can be unbounded,
+data in each bucket is organized into part files of finite size. The bucketing 
behaviour is fully configurable
+with a default time-based bucketing where we start writing a new bucket every 
hour. This means that each resulting
+bucket will contain files with records received during 1 hour intervals from 
the stream.
+
+Data within the bucket directories is split into part files. Each bucket will 
contain at least one part file for
+each subtask of the sink that has received data for that bucket. Additional 
part files will be created according to the configurable
+rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) 
the default policy rolls part files based
+on size, a timeout that specifies the maximum duration for which a file can be 
open, and a maximum inactivity
+timeout after which the file is closed. For `Bulk-encoded Formats` we roll on 
every checkpoint and the user can 
+specify additional conditions based on size or time.
+
+ <div class="alert alert-info">
+     <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the 
`FileSink` in `STREAMING` mode. Part files 
+     can only be finalized on successful checkpoints. If checkpointing is 
disabled part files will forever stay 
+     in `in-progress` or `pending` state and cannot be safely read by 
downstream systems.

Review comment:
       ```suggestion
        <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the 
`FileSink` in `STREAMING` mode. Part files 
        can only be finalized on successful checkpoints. If checkpointing is 
disabled, part files will forever stay 
        in the `in-progress` or the `pending` state, and cannot be safely read 
by downstream systems.
   ```

##########
File path: docs/dev/connectors/file_sink.md
##########
@@ -0,0 +1,808 @@
+---
+title: "File Sink"
+nav-title: File Sink
+nav-parent_id: connectors
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html). It provides the
+same guarantees for both `BATCH` and `STREAMING` execution by exposing the 
same API while having different runtime 
+implementations for each mode.
+
+The file sink writes incoming data into buckets. Given that the incoming 
streams can be unbounded,
+data in each bucket is organized into part files of finite size. The bucketing 
behaviour is fully configurable
+with a default time-based bucketing where we start writing a new bucket every 
hour. This means that each resulting
+bucket will contain files with records received during 1 hour intervals from 
the stream.
+
+Data within the bucket directories is split into part files. Each bucket will 
contain at least one part file for
+each subtask of the sink that has received data for that bucket. Additional 
part files will be created according to the configurable
+rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) 
the default policy rolls part files based
+on size, a timeout that specifies the maximum duration for which a file can be 
open, and a maximum inactivity
+timeout after which the file is closed. For `Bulk-encoded Formats` we roll on 
every checkpoint and the user can 
+specify additional conditions based on size or time.
+
+ <div class="alert alert-info">
+     <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the 
`FileSink` in `STREAMING` mode. Part files 
+     can only be finalized on successful checkpoints. If checkpointing is 
disabled part files will forever stay 
+     in `in-progress` or `pending` state and cannot be safely read by 
downstream systems.
+ </div>
+
+ <img src="{{ site.baseurl }}/fig/streamfilesink_bucketing.png" class="center" 
style="width: 100%;" />
+
+## File Formats
+
+The `FileSink` supports both row-wise and bulk encoding formats, such as 
[Apache Parquet](http://parquet.apache.org).
+These two variants come with their respective builders that can be created 
with the following static methods:
+
+ - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)`
+ - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)`
+
+When creating either a row or a bulk encoded sink we have to specify the base 
path where the buckets will be
+stored and the encoding logic for our data.
+
+Please check out the JavaDoc for [FileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.html)
+for all the configuration options and more documentation about the 
implementation of the different data formats.
+
+### Row-encoded Formats
+
+Row-encoded formats need to specify an [Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html)
+that is used for serializing individual rows to the `OutputStream` of the 
in-progress part files.
+
+In addition to the bucket assigner the [RowFormatBuilder]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.RowFormatBuilder.html)
 allows the user to specify:
+
+ - Custom [RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
 : Rolling policy to override the DefaultRollingPolicy
+ - bucketCheckInterval (default = 1 min) : Millisecond interval for checking 
time based rolling policies
+
+Basic usage for writing String elements thus looks like this:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.connector.file.sink.FileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+
+DataStream<String> input = ...;
+
+final FileSink<String> sink = FileSink
+    .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder<String>("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
+       .build();
+
+input.sinkTo(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.core.fs.Path
+import org.apache.flink.connector.file.sink.FileSink
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+
+val input: DataStream[String] = ...
+
+val sink: FileSink[String] = FileSink
+    .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder[String]("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
+    .build()
+
+input.sinkTo(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+This example creates a simple sink that assigns records to the default one 
hour time buckets. It also specifies
+a rolling policy that rolls the in-progress part file on either of the 
following 3 conditions:

Review comment:
       ```suggestion
   a rolling policy that rolls the in-progress part file on any of the 
following 3 conditions:
   ```

##########
File path: docs/dev/connectors/file_sink.md
##########
@@ -0,0 +1,808 @@
+---
+title: "File Sink"
+nav-title: File Sink
+nav-parent_id: connectors
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html). It provides the
+same guarantees for both `BATCH` and `STREAMING` execution by exposing the 
same API while having different runtime 
+implementations for each mode.
+
+The file sink writes incoming data into buckets. Given that the incoming 
streams can be unbounded,
+data in each bucket is organized into part files of finite size. The bucketing 
behaviour is fully configurable
+with a default time-based bucketing where we start writing a new bucket every 
hour. This means that each resulting
+bucket will contain files with records received during 1 hour intervals from 
the stream.
+
+Data within the bucket directories is split into part files. Each bucket will 
contain at least one part file for
+each subtask of the sink that has received data for that bucket. Additional 
part files will be created according to the configurable
+rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) 
the default policy rolls part files based
+on size, a timeout that specifies the maximum duration for which a file can be 
open, and a maximum inactivity
+timeout after which the file is closed. For `Bulk-encoded Formats` we roll on 
every checkpoint and the user can 
+specify additional conditions based on size or time.
+
+ <div class="alert alert-info">
+     <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the 
`FileSink` in `STREAMING` mode. Part files 
+     can only be finalized on successful checkpoints. If checkpointing is 
disabled part files will forever stay 
+     in `in-progress` or `pending` state and cannot be safely read by 
downstream systems.
+ </div>
+
+ <img src="{{ site.baseurl }}/fig/streamfilesink_bucketing.png" class="center" 
style="width: 100%;" />
+
+## File Formats
+
+The `FileSink` supports both row-wise and bulk encoding formats, such as 
[Apache Parquet](http://parquet.apache.org).
+These two variants come with their respective builders that can be created 
with the following static methods:
+
+ - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)`
+ - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)`
+
+When creating either a row or a bulk encoded sink we have to specify the base 
path where the buckets will be
+stored and the encoding logic for our data.
+
+Please check out the JavaDoc for [FileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.html)
+for all the configuration options and more documentation about the 
implementation of the different data formats.
+
+### Row-encoded Formats
+
+Row-encoded formats need to specify an [Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html)
+that is used for serializing individual rows to the `OutputStream` of the 
in-progress part files.
+
+In addition to the bucket assigner the [RowFormatBuilder]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.RowFormatBuilder.html)
 allows the user to specify:

Review comment:
       ```suggestion
   In addition to the bucket assigner, the [RowFormatBuilder]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.RowFormatBuilder.html)
 allows the user to specify:
   ```

##########
File path: docs/dev/connectors/file_sink.md
##########
@@ -0,0 +1,808 @@
+---
+title: "File Sink"
+nav-title: File Sink
+nav-parent_id: connectors
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html). It provides the
+same guarantees for both `BATCH` and `STREAMING` execution by exposing the 
same API while having different runtime 
+implementations for each mode.
+
+The file sink writes incoming data into buckets. Given that the incoming 
streams can be unbounded,
+data in each bucket is organized into part files of finite size. The bucketing 
behaviour is fully configurable
+with a default time-based bucketing where we start writing a new bucket every 
hour. This means that each resulting
+bucket will contain files with records received during 1 hour intervals from 
the stream.
+
+Data within the bucket directories is split into part files. Each bucket will 
contain at least one part file for
+each subtask of the sink that has received data for that bucket. Additional 
part files will be created according to the configurable
+rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) 
the default policy rolls part files based
+on size, a timeout that specifies the maximum duration for which a file can be 
open, and a maximum inactivity
+timeout after which the file is closed. For `Bulk-encoded Formats` we roll on 
every checkpoint and the user can 
+specify additional conditions based on size or time.
+
+ <div class="alert alert-info">
+     <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the 
`FileSink` in `STREAMING` mode. Part files 
+     can only be finalized on successful checkpoints. If checkpointing is 
disabled part files will forever stay 
+     in `in-progress` or `pending` state and cannot be safely read by 
downstream systems.
+ </div>
+
+ <img src="{{ site.baseurl }}/fig/streamfilesink_bucketing.png" class="center" 
style="width: 100%;" />
+
+## File Formats
+
+The `FileSink` supports both row-wise and bulk encoding formats, such as 
[Apache Parquet](http://parquet.apache.org).
+These two variants come with their respective builders that can be created 
with the following static methods:
+
+ - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)`
+ - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)`
+
+When creating either a row or a bulk encoded sink we have to specify the base 
path where the buckets will be
+stored and the encoding logic for our data.
+
+Please check out the JavaDoc for [FileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.html)
+for all the configuration options and more documentation about the 
implementation of the different data formats.
+
+### Row-encoded Formats
+
+Row-encoded formats need to specify an [Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html)
+that is used for serializing individual rows to the `OutputStream` of the 
in-progress part files.
+
+In addition to the bucket assigner the [RowFormatBuilder]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.RowFormatBuilder.html)
 allows the user to specify:
+
+ - Custom [RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
 : Rolling policy to override the DefaultRollingPolicy
+ - bucketCheckInterval (default = 1 min) : Millisecond interval for checking 
time based rolling policies
+
+Basic usage for writing String elements thus looks like this:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.connector.file.sink.FileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+
+DataStream<String> input = ...;
+
+final FileSink<String> sink = FileSink
+    .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder<String>("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
+       .build();
+
+input.sinkTo(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.core.fs.Path
+import org.apache.flink.connector.file.sink.FileSink
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+
+val input: DataStream[String] = ...
+
+val sink: FileSink[String] = FileSink
+    .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder[String]("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
+    .build()
+
+input.sinkTo(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+This example creates a simple sink that assigns records to the default one 
hour time buckets. It also specifies
+a rolling policy that rolls the in-progress part file on either of the 
following 3 conditions:
+
+ - It contains at least 15 minutes worth of data
+ - It hasn't received new records for the last 5 minutes
+ - The file size reached 1 GB (after writing the last record)

Review comment:
       ```suggestion
    - The file size has reached 1 GB (after writing the last record)
   ```

##########
File path: docs/dev/connectors/file_sink.md
##########
@@ -0,0 +1,808 @@
+---
+title: "File Sink"
+nav-title: File Sink
+nav-parent_id: connectors
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+This connector provides a unified Sink for `BATCH` and `STREAMING` that writes 
partitioned files to filesystems
+supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html). It provides the
+same guarantees for both `BATCH` and `STREAMING` execution by exposing the 
same API while having different runtime 
+implementations for each mode.
+
+The file sink writes incoming data into buckets. Given that the incoming 
streams can be unbounded,
+data in each bucket is organized into part files of finite size. The bucketing 
behaviour is fully configurable
+with a default time-based bucketing where we start writing a new bucket every 
hour. This means that each resulting
+bucket will contain files with records received during 1 hour intervals from 
the stream.
+
+Data within the bucket directories is split into part files. Each bucket will 
contain at least one part file for
+each subtask of the sink that has received data for that bucket. Additional 
part files will be created according to the configurable
+rolling policy. For `Row-encoded Formats` (see [File Formats](#file-formats)) 
the default policy rolls part files based
+on size, a timeout that specifies the maximum duration for which a file can be 
open, and a maximum inactivity
+timeout after which the file is closed. For `Bulk-encoded Formats` we roll on 
every checkpoint and the user can 
+specify additional conditions based on size or time.
+
+ <div class="alert alert-info">
+     <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the 
`FileSink` in `STREAMING` mode. Part files 
+     can only be finalized on successful checkpoints. If checkpointing is 
disabled part files will forever stay 
+     in `in-progress` or `pending` state and cannot be safely read by 
downstream systems.
+ </div>
+
+ <img src="{{ site.baseurl }}/fig/streamfilesink_bucketing.png" class="center" 
style="width: 100%;" />
+
+## File Formats
+
+The `FileSink` supports both row-wise and bulk encoding formats, such as 
[Apache Parquet](http://parquet.apache.org).
+These two variants come with their respective builders that can be created 
with the following static methods:
+
+ - Row-encoded sink: `FileSink.forRowFormat(basePath, rowEncoder)`
+ - Bulk-encoded sink: `FileSink.forBulkFormat(basePath, bulkWriterFactory)`
+
+When creating either a row or a bulk encoded sink we have to specify the base 
path where the buckets will be
+stored and the encoding logic for our data.
+
+Please check out the JavaDoc for [FileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.html)
+for all the configuration options and more documentation about the 
implementation of the different data formats.
+
+### Row-encoded Formats
+
+Row-encoded formats need to specify an [Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html)
+that is used for serializing individual rows to the `OutputStream` of the 
in-progress part files.
+
+In addition to the bucket assigner the [RowFormatBuilder]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/connector/file/sink/FileSink.RowFormatBuilder.html)
 allows the user to specify:
+
+ - Custom [RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
 : Rolling policy to override the DefaultRollingPolicy
+ - bucketCheckInterval (default = 1 min) : Millisecond interval for checking 
time based rolling policies
+
+Basic usage for writing String elements thus looks like this:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.connector.file.sink.FileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+
+DataStream<String> input = ...;
+
+final FileSink<String> sink = FileSink
+    .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder<String>("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
+       .build();
+
+input.sinkTo(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.core.fs.Path
+import org.apache.flink.connector.file.sink.FileSink
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+
+val input: DataStream[String] = ...
+
+val sink: FileSink[String] = FileSink
+    .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder[String]("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
+    .build()
+
+input.sinkTo(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+This example creates a simple sink that assigns records to the default one 
hour time buckets. It also specifies
+a rolling policy that rolls the in-progress part file on either of the 
following 3 conditions:
+
+ - It contains at least 15 minutes worth of data
+ - It hasn't received new records for the last 5 minutes
+ - The file size reached 1 GB (after writing the last record)
+
+### Bulk-encoded Formats
+
+Bulk-encoded sinks are created similarly to the row-encoded ones but here 
instead of
+specifying an `Encoder` we have to specify [BulkWriter.Factory]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html).
+The `BulkWriter` logic defines how new elements added, flushed and how the 
bulk of records
+are finalized for further encoding purposes.

Review comment:
       ```suggestion
   Bulk-encoded sinks are created similarly to the row-encoded ones, but 
instead of
   specifying an `Encoder`, we have to specify a [BulkWriter.Factory]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html).
   The `BulkWriter` logic defines how new elements are added and flushed, and 
how a batch of records
   is finalized for further encoding purposes.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to