gaoyunhaii commented on a change in pull request #18827:
URL: https://github.com/apache/flink/pull/18827#discussion_r809855147



##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -1016,10 +1016,14 @@ Once the total size of the cached files has reached the 
size threshold or the nu
 the cached files will be scheduled to compact.
 
 The {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" 
name="FileCompactor">}} specifies how to compact
-the give list of `Path` and write the result to {{< javadoc 
file="org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.html"
 name="CompactingFileWriter">}}. It could be classified into two types 
according to the type of the give `CompactingFileWriter`:
-
-- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
 name="OutputStreamBasedFileCompactor">}}**: The given `CompactingFileWriter` 
could be converted into an output stream that users could write the compacted 
results into. An example is the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" 
name="ConcatFileCompactor">}} that concats the list of files directly.
-- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}}**: The given `CompactingFileWriter` allows 
users to write records one-by-one into it. An example is the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}} that reads records from the source files and 
then writes them with the `CompactingFileWriter`. Users need to specify how to 
read records from the source files.
+the give list of `Path` and write the result file. It could be classified into 
two types according to how to write the file:
+
+- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
 name="OutputStreamBasedFileCompactor">}}**: 
+  The users can write the compacted results into an output stream. This is 
useful when the users don't want to or can't read records from the input files. 
+  An example is the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" 
name="ConcatFileCompactor">}} that concats the list of files directly.
+- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}}**: 
+  The compactor can read records one-by-one from the input files and write 
into the result file as like the `FileWriter`. 

Review comment:
       `as like the` -> `similar to` ?

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
##########
@@ -95,11 +102,22 @@ public void close() {
         Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
         CompactingFileWriter compactingFileWriter =
                 bucketWriter.openNewCompactingFile(
-                        fileCompactor.getWriterType(),
+                        compactingWriterType,
                         request.getBucketId(),
                         targetPath,
                         System.currentTimeMillis());
-        fileCompactor.compact(compactingFiles, compactingFileWriter);
+        if (compactingWriterType == Type.RECORD_WISE) {
+            ((RecordWiseFileCompactor) fileCompactor)
+                    .compact(
+                            compactingFiles,
+                            ((RecordWiseCompactingFileWriter) 
compactingFileWriter)::write);

Review comment:
       Might add `@SuppressWarnings({"rawtypes", "unchecked"})` to the method.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
##########
@@ -95,11 +102,22 @@ public void close() {
         Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
         CompactingFileWriter compactingFileWriter =
                 bucketWriter.openNewCompactingFile(
-                        fileCompactor.getWriterType(),
+                        compactingWriterType,
                         request.getBucketId(),
                         targetPath,
                         System.currentTimeMillis());
-        fileCompactor.compact(compactingFiles, compactingFileWriter);
+        if (compactingWriterType == Type.RECORD_WISE) {
+            ((RecordWiseFileCompactor) fileCompactor)

Review comment:
       Might change to `RecordWiseFileCompactor<?>`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to