aljoscha commented on a change in pull request #10859: [FLINK-15278] Update the 
StreamingFileSink docs
URL: https://github.com/apache/flink/pull/10859#discussion_r366760554
 
 

 ##########
 File path: docs/dev/connectors/streamfile_sink.md
 ##########
 @@ -361,7 +266,174 @@ input.addSink(sink)
 
 The SequenceFileWriterFactory supports additional constructor parameters to 
specify compression settings.
 
-### Important Considerations for S3
+## Bucket Assignment
+
+The bucketing logic defines how the data will be structured into 
subdirectories inside the base output directory.
+
+Both row and bulk formats (see [File Formats](#file-formats)) use the 
[DateTimeBucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
 as the default assigner.
+By default the `DateTimeBucketAssigner` creates hourly buckets based on the 
system default timezone
+with the following format: `yyyy-MM-dd--HH`. Both the date format (*i.e.* 
bucket size) and timezone can be
+configured manually.
+
+We can specify a custom [BucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html)
 by calling `.withBucketAssigner(assigner)` on the format builders.
+
+Flink comes with two built in BucketAssigners:
+
+ - [DateTimeBucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.html)
 : Default time based assigner
+ - [BasePathBucketAssigner]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/BasePathBucketAssigner.html)
 : Assigner that stores all part files in the base path (single global bucket)
+
+## Rolling Policy
+
+The [RollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.html)
 defines when a given in-progress part file will be closed and moved to the 
pending and later to finished state.
+Part files in the "finished" state are the ones that are ready for viewing and 
are guaranteed to contain valid data that will not be reverted in case of 
failure.
+The Rolling Policy in combination with the checkpointing interval (pending 
files become finished on the next checkpoint) control how quickly
+part files become available for downstream readers and also the size and 
number of these parts.
+
+Flink comes with two built-in RollingPolicies:
+
+ - [DefaultRollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/DefaultRollingPolicy.html)
+ - [OnCheckpointRollingPolicy]({{ site.javadocs_baseurl 
}}/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.html)
+
+## Part file lifecycle
+
+In order to use the output of the `StreamingFileSink` in downstream systems, 
we need to understand the naming and lifecycle of the output files produced.
+
+Part files can be in one of three states:
+ 1. **In-progress** : The part file that is currently being written to is 
in-progress
+ 2. **Pending** : Closed (due to the specified rolling policy) in-progress 
files that are waiting to be committed
+ 3. **Finished** : On successful checkpoints pending files transition to 
"Finished"
+
+Only finished files are safe to read by downstream systems as those are 
guaranteed to not be modified later.
+
+<div class="alert alert-info">
+     <b>IMPORTANT:</b> Part file indexes are strictly increasing for any given 
subtask (in the order they were created). However these indexes are not always 
sequential. When the job restarts, the next part index for all subtask will be 
the `max part index + 1`
+where `max` is computed across all subtasks.
+</div>
+
+Each writer subtask will have a single in-progress part file at any given time 
for every active bucket, but there can be several pending and finished files.
+
+**Part file example**
+
+To better understand the lifecycle of these files let's look at a simple 
example with 2 sink subtasks:
+
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    └── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
+```
+
+When the part file `part-1-0` is rolled (let's say it becomes too large), it 
becomes pending but it is not renamed. The sink then opens a new part file: 
`part-1-1`:
+
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── part-1-0.inprogress.ea65a428-a1d0-4a0b-bbc5-7a436a75e575
+    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
+As `part-1-0` is now pending completion, after the next successful checkpoint, 
it is finalized:
+
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── part-1-0
+    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
+New buckets are created as dictated by the bucketing policy, and this doesn't 
affect currently in-progress files:
+
+```
+└── 2019-08-25--12
+    ├── part-0-0.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── part-1-0
+    └── part-1-1.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+└── 2019-08-25--13
+    └── part-0-2.inprogress.2b475fec-1482-4dea-9946-eb4353b475f1
+```
+
+Old buckets can still receive new records as the bucketing policy is evaluated 
on a per-record basis.
+
+### Part file configuration
+
+Finished files can be distinguished from the in-progress ones by their naming 
scheme only.
+
+By default, the file naming strategy is as follows:
+ - **In-progress / Pending**: 
`part-<subtaskIndex>-<partFileIndex>.inprogress.uid`
+ - **Finished:** `part-<subtaskIndex>-<partFileIndex>`
+
+Flink allows the user to specify a prefix and/or a suffix for his/her part 
files. 
+This can be done using an `OutputFileConfig`. 
+For example for a prefix "prefix" and a suffix ".ext" the sink will create the 
following files:
+
+```
+└── 2019-08-25--12
+    ├── prefix-0-0.ext
+    ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+    ├── prefix-1-0.ext
+    └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
+The user can specify an `OutputFileConfig` in the following way:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+OutputFileConfig config = OutputFileConfig
+ .builder()
+ .withPartPrefix("prefix")
+ .withPartSuffix(".ext")
+ .build();
+            
+StreamingFileSink<Tuple2<Integer, Integer>> sink = StreamingFileSink
+ .forRowFormat((new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
+ .withBucketAssigner(new KeyBucketAssigner())
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .withOutputFileConfig(config)
+ .build();
+                       
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val config = OutputFileConfig
+ .builder()
+ .withPartPrefix("prefix")
+ .withPartSuffix(".ext")
+ .build()
+            
+val sink = StreamingFileSink
+ .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
+ .withBucketAssigner(new KeyBucketAssigner())
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .withOutputFileConfig(config)
+ .build()
+                       
+{% endhighlight %}
+</div>
+</div>
+
+## Important Considerations
+
+### General
+
+<span class="label label-danger">Important Note 1</span>: When using Hadoop < 
2.7, please use
+the `OnCheckpointRollingPolicy` which rolls part files on every checkpoint. 
The reason is that if part files "traverse"
+checkpoint interval, then, upon recovery from a failure the 
`StreamingFileSink` may use the `truncate()` method of the 
 
 Review comment:
   ```suggestion
   the checkpoint interval, then, upon recovery from a failure the 
`StreamingFileSink` may use the `truncate()` method of the 
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to