This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bda6f6  [FLINK-13842][docs] Improve Javadocs and web documentation of 
the StreamingFileSink
7bda6f6 is described below

commit 7bda6f61c9ce0db80f2b1d4c8e7fd6cad9b0eaf9
Author: Gyula Fora <gyf...@apache.org>
AuthorDate: Thu Sep 5 16:04:25 2019 +0200

    [FLINK-13842][docs] Improve Javadocs and web documentation of the 
StreamingFileSink
    
    Closes #9530
---
 docs/dev/connectors/streamfile_sink.md             | 339 +++++++++++++++++----
 docs/fig/streamfilesink_bucketing.png              | Bin 0 -> 100661 bytes
 .../sink/filesystem/StreamingFileSink.java         |  25 ++
 .../rollingpolicies/DefaultRollingPolicy.java      |   6 +
 4 files changed, 316 insertions(+), 54 deletions(-)

diff --git a/docs/dev/connectors/streamfile_sink.md 
b/docs/dev/connectors/streamfile_sink.md
index 9017447..c59e646 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -23,30 +23,140 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+* This will be replaced by the TOC
+{:toc}
+
 This connector provides a Sink that writes partitioned files to filesystems
 supported by the [Flink `FileSystem` abstraction]({{ 
site.baseurl}}/ops/filesystems/index.html).
 
-Since in streaming the input is potentially infinite, the streaming file sink 
writes data
-into buckets. The bucketing behaviour is configurable but a useful default is 
time-based
-bucketing where we start writing a new bucket every hour and thus get
-individual files that each contain a part of the infinite output stream.
+In order to handle unbounded data streams, the streaming file sink writes 
incoming data
+into buckets. The bucketing behaviour is fully configurable with a default 
time-based
+bucketing where we start writing a new bucket every hour and thus get files 
that correspond to
+records received during certain time intervals from the stream.
+
+The bucket directories themselves contain several part files with the actual 
output data, with at least
+one for each subtask of the sink that has received data for the bucket. 
Additional part files will be created according to the configurable
+rolling policy. The default policy rolls files based on size, a timeout that 
specifies the maximum duration for which a file can be open, and a maximum 
inactivity timeout after which the file is closed.
+
+ <div class="alert alert-info">
+     <b>IMPORTANT:</b> Checkpointing needs to be enabled when using the 
StreamingFileSink. Part files can only be finalized
+     on successful checkpoints. If checkpointing is disabled part files will 
forever stay in `in-progress` or `pending` state
+     and cannot be safely read by downstream systems.
+ </div>
+
+ <img src="{{ site.baseurl }}/fig/streamfilesink_bucketing.png" class="center" 
style="width: 100%;" />
+
+### Bucket Assignment
+
+The bucketing logic defines how the data will be structured into 
subdirectories inside the base output directory.
+
+Both row and bulk formats use the [DateTimeBucketAssigner]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
 as the default assigner.
+By default the DateTimeBucketAssigner creates hourly buckets based on the 
system default timezone
+with the following format: `yyyy-MM-dd--HH`. Both the date format (i.e. bucket 
size) and timezone can be
+configured manually.
+
+We can specify a custom [BucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html)
 by calling `.withBucketAssigner(assigner)` on the format builders.
+
+Flink comes with two built in BucketAssigners:
+
+ - [DateTimeBucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
 : Default time based assigner
+ - [BasePathBucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.html)
 : Assigner that stores all part files in the base path (single global bucket)
+
+### Rolling Policy
+
+The [RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
 defines when a given in-progress part file will be closed and moved to the 
pending and later to finished state.
+In combination with the checkpointing interval (pending files become finished 
on the next checkpoint) this controls how quickly
+part files become available for downstream readers and also the size and 
number of these parts.
+
+Flink comes with two built-in RollingPolicies:
+
+ - [DefaultRollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)
+ - [OnCheckpointRollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.html)
+
+### Part file lifecycle
+
+In order to use the output of the StreamingFileSink in downstream systems, we 
need to understand the naming and lifecycle of the output files produced.
+
+Part files can be in one of three states:
+ 1. **In-progress** : The part file that is currently being written to is 
in-progress
+ 2. **Pending** : Once a part file is closed for writing it becomes pending
+ 3. **Finished** : On successful checkpoints pending files become finished
+
+Only finished files are safe to read by downstream systems as those are 
guaranteed to not be modified later. Finished files can be distinguished by 
their naming scheme only.
+
+File naming schemes:
+ - **In-progress / Pending**: `part-subtaskIndex-partFileIndex.inprogress.uid`
+ - **Finished:** `part-subtaskIndex-partFileIndex`
+
+Part file indexes are strictly increasing for any given subtask (in the order 
they were created). However these indexes are not always sequential. When the 
job restarts, the next part index for all subtask will be the `max part index + 
1`.
+
+Each writer subtask will have a single in-progress part file at any given time 
for every active bucket, but there can be several pending and finished files.
+
+**Part file example**
+
+To better understand the lifecycle of these files let's look at a simple 
example with 2 sink subtasks:
+
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    └── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
+```
+
+When the part file `part-1-0` is rolled (let's say it becomes too large), it 
becomes pending but it is not renamed. The sink then opens a new part file: 
`part-1-1`:
+
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
+    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
 
-Within a bucket, we further split the output into smaller part files based on a
-rolling policy. This is useful to prevent individual bucket files from getting
-too big. This is also configurable but the default policy rolls files based on
-file size and a timeout, *i.e* if no new data was written to a part file. 
+As `part-1-0` is now pending completion, after the next successful checkpoint, 
it is finalized:
 
-The `StreamingFileSink` supports both row-wise encoding formats and
-bulk-encoding formats, such as [Apache Parquet](http://parquet.apache.org).
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── part-1-0
+    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
 
-#### Using Row-encoded Output Formats
+New buckets are created as dictated by the bucketing policy, and this doesn't 
affect currently in-progress files:
 
-The only required configuration are the base path where we want to output our
-data and an
-[Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html)
-that is used for serializing records to the `OutputStream` for each file.
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── part-1-0
+    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+└── 2019-08-25--13
+    └── part-0-2.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1
+```
 
-Basic usage thus looks like this:
+Old buckets can still receive new records as the bucketing policy is evaluated 
on a per-record basis.
+
+## File Formats
+
+The `StreamingFileSink` supports both row-wise and bulk encoding formats, such 
as [Apache Parquet](http://parquet.apache.org).
+These two variants come with their respective builders that can be created 
with the following static methods:
+
+ - Row-encoded sink: `StreamingFileSink.forRowFormat(basePath, rowEncoder)`
+ - Bulk-encoded sink: `StreamingFileSink.forBulkFormat(basePath, 
bulkWriterFactory)`
+
+When creating either a row or a bulk encoded sink we have to specify the base 
path where the buckets will be
+stored and the encoding logic for our data.
+
+Please check out the JavaDoc for [StreamingFileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html)
 for all the configuration options
+and more documentation about the implementation of the different data formats.
+
+### Row-encoded Formats
+
+Row-encoded formats need to specify an [Encoder]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/Encoder.html) that is 
used for serializing individual rows to the `OutputStream` of the in-progress 
part files.
+
+In addition to the bucket assigner the [RowFormatBuilder]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.RowFormatBuilder.html)
 allows the user to specify:
+
+ - Custom [RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
 : Rolling polciy to override the DefaultRollingPolicy
+ - bucketCheckInterval (default = 1 min) : Millisecond interval for checking 
time based rolling policies
+
+Basic usage for writing String elements thus looks like this:
 
 
 <div class="codetabs" markdown="1">
@@ -55,11 +165,18 @@ Basic usage thus looks like this:
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 
 DataStream<String> input = ...;
 
 final StreamingFileSink<String> sink = StreamingFileSink
-       .forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
+    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
        .build();
 
 input.addSink(sink);
@@ -71,55 +188,169 @@ input.addSink(sink);
 import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.core.fs.Path
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 
 val input: DataStream[String] = ...
 
 val sink: StreamingFileSink[String] = StreamingFileSink
     .forRowFormat(new Path(outputPath), new 
SimpleStringEncoder[String]("UTF-8"))
+    .withRollingPolicy(
+        DefaultRollingPolicy.builder()
+            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
+            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
+            .withMaxPartSize(1024 * 1024 * 1024)
+            .build())
     .build()
-    
+
 input.addSink(sink)
 
 {% endhighlight %}
 </div>
 </div>
 
-This will create a streaming sink that creates hourly buckets and uses a
-default rolling policy. The default bucket assigner is
-[DateTimeBucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
-and the default rolling policy is
-[DefaultRollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html).
-You can specify a custom
-[BucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html)
-and
-[RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
-on the sink builder. Please check out the JavaDoc for
-[StreamingFileSink]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html)
-for more configuration options and more documentation about the workings and
-interactions of bucket assigners and rolling policies.
-
-#### Using Bulk-encoded Output Formats
-
-In the above example we used an `Encoder` that can encode or serialize each
-record individually. The streaming file sink also supports bulk-encoded output
-formats such as [Apache Parquet](http://parquet.apache.org). To use these,
-instead of `StreamingFileSink.forRowFormat()` you would use
-`StreamingFileSink.forBulkFormat()` and specify a `BulkWriter.Factory`.
-
-[ParquetAvroWriters]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.html)
-has static methods for creating a `BulkWriter.Factory` for various types.
-
-<div class="alert alert-info">
-    <b>IMPORTANT:</b> Bulk-encoding formats can only be combined with the
-    `OnCheckpointRollingPolicy`, which rolls the in-progress part file on
-    every checkpoint.
+This example creates a simple sink that assigns records to the default one 
hour time buckets. It also specifies
+a rolling policy that rolls the in-progress part file on either of the 
following 3 conditions:
+
+ - It contains at least 15 minutes worth of data
+ - It hasn't received new records for the last 5 minutes
+ - The file size reached 1 GB (after writing the last record)
+
+### Bulk-encoded Formats
+
+Bulk-encoded sinks are created similarly to the row-encoded ones but here 
instead of
+specifying an `Encoder` we have to specify [BulkWriter.Factory]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/api/common/serialization/BulkWriter.Factory.html).
+The `BulkWriter` logic defines how new elements added, flushed and how the 
bulk of records
+are finalized for further encoding purposes.
+
+Flink comes with two built-in BulkWriter factories:
+
+ - [ParquetWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/parquet/ParquetWriterFactory.html)
+ - [SequenceFileWriterFactory]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.html)
+
+#### Parquet format
+
+Flink contains built in convenience methods for creating Parquet writer 
factories for Avro data. These methods
+and their associated documentation can be found in the [ParquetAvroWriters]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/parquet/avro/ParquetAvroWriters.html) 
class.
+
+For writing to other Parquet compatible data formats, users need to create the 
ParquetWriterFactory with a custom implementation of the [ParquetBuilder]({{ 
site.javadocs_baseurl 
}}/api/java/org/apache/flink/formats/parquet/ParquetBuilder.html) interface.
+
+To use the Parquet bulk encoder in your application you need to add the 
following dependency:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-parquet{{ site.scala_version_suffix }}</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+A StreamingFileSink that writes Avro data to Parquet format can be created 
like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
+import org.apache.avro.Schema;
+
+
+Schema schema = ...;
+DataStream<GenericRecord> stream = ...;
+
+final StreamingFileSink<GenericRecord> sink = StreamingFileSink
+       .forBulkFormat(outputBasePath, 
ParquetAvroWriters.forGenericRecord(schema))
+       .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
+import org.apache.avro.Schema
+
+val schema: Schema = ...
+val input: DataStream[GenericRecord] = ...
+
+val sink: StreamingFileSink[GenericRecord] = StreamingFileSink
+    .forBulkFormat(outputBasePath, ParquetAvroWriters.forGenericRecord(schema))
+    .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
 </div>
 
-#### Important Considerations for S3
+#### Hadoop SequenceFile format
+
+To use the SequenceFile bulk encoder in your application you need to add the 
following dependency:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-sequence-file</artifactId>
+  <version>{{ site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+A simple SequenceFile writer can be created like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+
+
+DataStream<Tuple2<LongWritable, Text>> input = ...;
+Configuration hadoopConf = 
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
+final StreamingFileSink<Tuple2<LongWritable, Text>> sink = StreamingFileSink
+  .forBulkFormat(
+    outputBasePath,
+    new SequenceFileWriterFactory<>(hadoopConf, LongWritable.class, 
Text.class))
+       .build();
+
+input.addSink(sink);
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.flink.configuration.GlobalConfiguration
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.SequenceFile
+import org.apache.hadoop.io.Text;
+
+val input: DataStream[(LongWritable, Text)] = ...
+val hadoopConf: Configuration = 
HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())
+val sink: StreamingFileSink[(LongWritable, Text)] = StreamingFileSink
+  .forBulkFormat(
+    outputBasePath,
+    new SequenceFileWriterFactory(hadoopConf, LongWritable.class, Text.class))
+       .build()
+
+input.addSink(sink)
+
+{% endhighlight %}
+</div>
+</div>
+
+The SequenceFileWriterFactory supports additional constructor parameters to 
specify compression settings.
+
+### Important Considerations for S3
 
-<span class="label label-danger">Important Note 1</span>: For S3, the 
`StreamingFileSink` 
+<span class="label label-danger">Important Note 1</span>: For S3, the 
`StreamingFileSink`
 supports only the [Hadoop-based](https://hadoop.apache.org/) FileSystem 
implementation, not
-the implementation based on [Presto](https://prestodb.io/). In case your job 
uses the 
+the implementation based on [Presto](https://prestodb.io/). In case your job 
uses the
 `StreamingFileSink` to write to S3 but you want to use the Presto-based one 
for checkpointing,
 it is advised to use explicitly *"s3a://"* (for Hadoop) as the scheme for the 
target path of
 the sink and *"s3p://"* for checkpointing (for Presto). Using *"s3://"* for 
both the sink
@@ -128,10 +359,10 @@ and checkpointing may lead to unpredictable behavior, as 
both implementations "l
 <span class="label label-danger">Important Note 2</span>: To guarantee 
exactly-once semantics while
 being efficient, the `StreamingFileSink` uses the [Multi-part 
Upload](https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html)
 feature of S3 (MPU from now on). This feature allows to upload files in 
independent chunks (thus the "multi-part")
-which can be combined into the original file when all the parts of the MPU are 
successfully uploaded. 
-For inactive MPUs, S3 supports a bucket lifecycle rule that the user can use 
to abort multipart uploads 
-that don't complete within a specified number of days after being initiated. 
This implies that if you set this rule 
-aggressively and take a savepoint with some part-files being not fully 
uploaded, their associated MPUs may time-out 
+which can be combined into the original file when all the parts of the MPU are 
successfully uploaded.
+For inactive MPUs, S3 supports a bucket lifecycle rule that the user can use 
to abort multipart uploads
+that don't complete within a specified number of days after being initiated. 
This implies that if you set this rule
+aggressively and take a savepoint with some part-files being not fully 
uploaded, their associated MPUs may time-out
 before the job is restarted. This will result in your job not being able to 
restore from that savepoint as the
 pending part-files are no longer there and Flink will fail with an exception 
as it tries to fetch them and fails.
 
diff --git a/docs/fig/streamfilesink_bucketing.png 
b/docs/fig/streamfilesink_bucketing.png
new file mode 100644
index 0000000..fe3b226
Binary files /dev/null and b/docs/fig/streamfilesink_bucketing.png differ
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 716b4c9..75692dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -224,14 +224,30 @@ public class StreamingFileSink<IN>
                        this.partFileSuffix = 
Preconditions.checkNotNull(partFileSuffix);
                }
 
+               /**
+                * Creates a new builder instance with the specified bucket 
check interval. The interval specifies how often
+                * time based {@link RollingPolicy}s will be checked/executed 
for the open buckets.
+                * @param interval Time interval in milliseconds
+                * @return A new builder with the check interval set.
+                */
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketCheckInterval(final long interval) {
                        return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, rollingPolicy, interval, bucketFactory, partFilePrefix, 
partFileSuffix);
                }
 
+               /**
+                * Creates a new builder instance with the specified {@link 
BucketAssigner}.
+                * @param assigner @{@link BucketAssigner} to be used.
+                * @return A new builder with the assigner set.
+                */
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
                        return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), rollingPolicy, bucketCheckInterval, 
bucketFactory, partFilePrefix, partFileSuffix);
                }
 
+               /**
+                * Creates a new builder instance with the specified {@link 
RollingPolicy} set for the bucketing logic.
+                * @param policy {@link RollingPolicy} to be applied
+                * @return A new builder with the check interval set.
+                */
                public StreamingFileSink.RowFormatBuilder<IN, BucketID> 
withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
                        return new RowFormatBuilder<>(basePath, encoder, 
bucketAssigner, Preconditions.checkNotNull(policy), bucketCheckInterval, 
bucketFactory, partFilePrefix, partFileSuffix);
                }
@@ -314,10 +330,19 @@ public class StreamingFileSink<IN>
                        this.partFileSuffix = 
Preconditions.checkNotNull(partFileSuffix);
                }
 
+               /**
+                * Currently bulk formats always use the {@link 
OnCheckpointRollingPolicy} therefore this settings does
+                * not have any effect.
+                */
                public StreamingFileSink.BulkFormatBuilder<IN, BucketID> 
withBucketCheckInterval(long interval) {
                        return new BulkFormatBuilder<>(basePath, writerFactory, 
bucketAssigner, interval, bucketFactory, partFilePrefix, partFileSuffix);
                }
 
+               /**
+                * Creates a new builder instance with the specified {@link 
BucketAssigner}.
+                * @param assigner @{@link BucketAssigner} to be used.
+                * @return A new builder with the assigner set.
+                */
                public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID> 
withBucketAssigner(BucketAssigner<IN, ID> assigner) {
                        return new BulkFormatBuilder<>(basePath, writerFactory, 
Preconditions.checkNotNull(assigner), bucketCheckInterval, new 
DefaultBucketFactoryImpl<>(), partFilePrefix, partFileSuffix);
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
index d9bfbcc..6a12c3c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.java
@@ -158,6 +158,9 @@ public final class DefaultRollingPolicy<IN, BucketID> 
implements RollingPolicy<I
 
                /**
                 * Sets the interval of allowed inactivity after which a part 
file will have to roll.
+                * The frequency at which this is checked is controlled by the
+                * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
+                * setting.
                 * @param interval the allowed inactivity interval.
                 */
                public DefaultRollingPolicy.PolicyBuilder 
withInactivityInterval(final long interval) {
@@ -167,6 +170,9 @@ public final class DefaultRollingPolicy<IN, BucketID> 
implements RollingPolicy<I
 
                /**
                 * Sets the max time a part file can stay open before having to 
roll.
+                * The frequency at which this is checked is controlled by the
+                * {@link 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.RowFormatBuilder#withBucketCheckInterval(long)}
+                * setting.
                 * @param interval the desired rollover interval.
                 */
                public DefaultRollingPolicy.PolicyBuilder 
withRolloverInterval(final long interval) {

Reply via email to