This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c99b00  [FLINK-26235][connectors/filesystem] CompactingFileWriter and 
PendingFileRecoverable should not be exposed to users.
3c99b00 is described below

commit 3c99b005563debf35f43df5fb4ec92863e7775a9
Author: Gen Luo <luogen...@gmail.com>
AuthorDate: Fri Feb 18 11:54:56 2022 +0800

    [FLINK-26235][connectors/filesystem] CompactingFileWriter and 
PendingFileRecoverable should not be exposed to users.
    
    This closes #18827.
---
 .../docs/connectors/datastream/filesystem.md       | 10 +++---
 .../docs/connectors/datastream/filesystem.md       | 12 ++++---
 .../7602816f-5c01-4b7a-9e3e-235dfedec245           |  3 +-
 .../file/sink/compactor/FileCompactor.java         | 23 +++---------
 .../sink/compactor/IdenticalFileCompactor.java     |  7 ++--
 .../compactor/OutputStreamBasedFileCompactor.java  | 22 ++++--------
 .../sink/compactor/RecordWiseFileCompactor.java    | 26 +++++++-------
 .../sink/compactor/operator/CompactService.java    | 41 ++++++++++++++++++----
 .../sink/filesystem/CompactingFileWriter.java      |  6 ++--
 .../sink/filesystem/InProgressFileWriter.java      |  2 --
 10 files changed, 80 insertions(+), 72 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md 
b/docs/content.zh/docs/connectors/datastream/filesystem.md
index e0e391c..31542da 100644
--- a/docs/content.zh/docs/connectors/datastream/filesystem.md
+++ b/docs/content.zh/docs/connectors/datastream/filesystem.md
@@ -1013,11 +1013,13 @@ val fileSink: FileSink[Integer] =
 目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量。当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint 
次数已经达到指定次数时,
 `FileSink` 将创建一个异步任务来合并当前缓存的文件。
 
-{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" 
name="FileCompactor">}} 指定如何将给定的路径列表对应的文件进行合并将结果写入
-到 {{< javadoc 
file="org/apache/flink/streaming/api/functions/sink/filesystem//CompactingFileWriter.html"
 name="CompactingFileWriter">}} 中。根据所给定的 `CompactingFileWriter` 的类型,它可以分为两类:
+{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" 
name="FileCompactor">}} 指定如何将给定的路径列表对应的文件进行合并将结果写入到文件中。
+根据如何写文件,它可以分为两类:
 
-- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
 name="OutputStreamBasedFileCompactor">}}** : 这种类型的 `CompactingFileWriter` 
可以被转换为一个输出流,用户可以将合并后的结果直接写入该流中。这种类型的 `CompactingFileWriter` 的一个例子是 {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" 
name="ConcatFileCompactor">}},它直接将给定的文件进行合并并将结果写到输出流中。
-- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}}** :这种类型的 `CompactingFileWriter` 
允许用户将按条写入记录。`CompactingFileWriter` 的一个例子是 {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}} ,它从给定的文件中读出记录并写出到 `CompactingFileWriter` 
中。用户需要指定如何从原始文件中读出记录。
+- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
 name="OutputStreamBasedFileCompactor">}}** : 
+  用户将合并后的结果写入一个输出流中。通常在用户不希望或者无法从输入文件中读取记录时使用。这种类型的 `CompactingFileWriter` 
的一个例子是 {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" 
name="ConcatFileCompactor">}},它直接将给定的文件进行合并并将结果写到输出流中。
+- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}}** :
+  这种类型的 `CompactingFileWriter` 
会逐条读出输入文件的记录用户,然后和`FileWriter`一样写入输出文件中。`CompactingFileWriter` 的一个例子是 {{< 
javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}} ,它从给定的文件中读出记录并写出到 `CompactingFileWriter` 
中。用户需要指定如何从原始文件中读出记录。
 
 {{< hint info >}}
 **重要** 如果启用了文件合并功能,文件可见的时间会被延长。
diff --git a/docs/content/docs/connectors/datastream/filesystem.md 
b/docs/content/docs/connectors/datastream/filesystem.md
index 7b0436c..85224c1 100644
--- a/docs/content/docs/connectors/datastream/filesystem.md
+++ b/docs/content/docs/connectors/datastream/filesystem.md
@@ -1016,10 +1016,14 @@ Once the total size of the cached files has reached the 
size threshold or the nu
 the cached files will be scheduled to compact.
 
 The {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" 
name="FileCompactor">}} specifies how to compact
-the give list of `Path` and write the result to {{< javadoc 
file="org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.html"
 name="CompactingFileWriter">}}. It could be classified into two types 
according to the type of the give `CompactingFileWriter`:
-
-- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
 name="OutputStreamBasedFileCompactor">}}**: The given `CompactingFileWriter` 
could be converted into an output stream that users could write the compacted 
results into. An example is the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" 
name="ConcatFileCompactor">}} that concats the list of files directly.
-- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}}**: The given `CompactingFileWriter` allows 
users to write records one-by-one into it. An example is the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}} that reads records from the source files and 
then writes them with the `CompactingFileWriter`. Users need to specify how to 
re [...]
+the give list of `Path` and write the result file. It could be classified into 
two types according to how to write the file:
+
+- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html"
 name="OutputStreamBasedFileCompactor">}}**: 
+  The users can write the compacted results into an output stream. This is 
useful when the users don't want to or can't read records from the input files. 
+  An example is the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" 
name="ConcatFileCompactor">}} that concats the list of files directly.
+- **{{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}}**: 
+  The compactor can read records one-by-one from the input files and write 
into the result file similar to the `FileWriter`. 
+  An example is the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html"
 name="RecordWiseFileCompactor">}} that reads records from the source files and 
then writes them with the `CompactingFileWriter`. Users need to specify how to 
read records from the source files.
 
 {{< hint info >}}
 **Important** Once the compaction is enabled, the written files need to wait 
for longer time before they get visible.
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
index f61d7d9..ae9b43c 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
@@ -225,6 +225,7 @@ 
org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter
 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
 
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
+org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does 
not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
@@ -310,4 +311,4 @@ 
org.apache.flink.streaming.api.windowing.triggers.TriggerResult does not satisfy
 org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer does 
not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
 
org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer$GlobalWindowSerializerSnapshot
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
 org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer does 
not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
-org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
\ No newline at end of file
+org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot
 does not satisfy: annotated with @Internal or annotated with @Experimental or 
annotated with @PublicEvolving or annotated with @Public or annotated with 
@Deprecated
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
index b737fd3..5217fee 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java
@@ -19,30 +19,15 @@
 package org.apache.flink.connector.file.sink.compactor;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
 
 import java.io.Serializable;
-import java.util.List;
 
 /**
  * The {@link FileCompactor} is responsible for compacting files into one file.
  *
- * <p>The {@link FileCompactor} should declare which type of {@link 
CompactingFileWriter} is
- * required, and invoke the writer correspondingly.
+ * <p>Users should never implement the interface directly but use either {@link
+ * OutputStreamBasedFileCompactor} or {@link RecordWiseFileCompactor}. Other 
implementations will
+ * cause UnsupportedOperationException at runtime.
  */
 @PublicEvolving
-public interface FileCompactor extends Serializable {
-
-    /** @return the {@link CompactingFileWriter} type the compactor will use. 
*/
-    CompactingFileWriter.Type getWriterType();
-
-    /**
-     * Compact the given files into one file.
-     *
-     * @param inputFiles the files to be compacted.
-     * @param writer the writer to write the compacted file.
-     * @throws Exception Thrown if an exception occurs during the compacting.
-     */
-    void compact(List<Path> inputFiles, CompactingFileWriter writer) throws 
Exception;
-}
+public interface FileCompactor extends Serializable {}
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
index 8e6e4a1..923b56c 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.file.sink.compactor;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
 
+import java.io.OutputStream;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -37,9 +37,8 @@ public class IdenticalFileCompactor extends 
ConcatFileCompactor {
         super();
     }
 
-    @Override
-    public void compact(List<Path> inputFiles, CompactingFileWriter writer) 
throws Exception {
+    public void compact(List<Path> inputFiles, OutputStream outputStream) 
throws Exception {
         checkState(inputFiles.size() == 1, "IdenticalFileCompactor can only 
copy one input file");
-        super.compact(inputFiles, writer);
+        super.compact(inputFiles, outputStream);
     }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
index 2ac3805..96bb902 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java
@@ -20,32 +20,22 @@ package org.apache.flink.connector.file.sink.compactor;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
 import org.apache.flink.util.CloseShieldOutputStream;
 
 import java.io.OutputStream;
 import java.util.List;
 
 /**
- * Base class for {@link FileCompactor} implementations that use the {@link
- * OutputStreamBasedCompactingFileWriter}.
+ * Base class for {@link FileCompactor} implementations that write the 
compacting file by a output
+ * stream.
  */
 @PublicEvolving
 public abstract class OutputStreamBasedFileCompactor implements FileCompactor {
-    @Override
-    public final CompactingFileWriter.Type getWriterType() {
-        return CompactingFileWriter.Type.OUTPUT_STREAM;
-    }
 
-    @Override
-    public void compact(List<Path> inputFiles, CompactingFileWriter writer) 
throws Exception {
-        // The outputStream returned by 
OutputStreamBasedCompactingFileWriter#asOutputStream should
-        // not be closed here.
-        CloseShieldOutputStream outputStream =
-                new CloseShieldOutputStream(
-                        ((OutputStreamBasedCompactingFileWriter) 
writer).asOutputStream());
-        doCompact(inputFiles, outputStream);
+    public void compact(List<Path> inputFiles, OutputStream outputStream) 
throws Exception {
+        // The outputStream should not be closed here.
+        CloseShieldOutputStream shieldOutputStream = new 
CloseShieldOutputStream(outputStream);
+        doCompact(inputFiles, shieldOutputStream);
     }
 
     protected abstract void doCompact(List<Path> inputFiles, OutputStream 
outputStream)
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
index 3244961..17794cd 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java
@@ -20,8 +20,6 @@ package org.apache.flink.connector.file.sink.compactor;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -29,7 +27,7 @@ import java.util.List;
 
 /**
  * A {@link FileCompactor} implementation that reads input files with a {@link 
Reader} and writes
- * with the {@link RecordWiseCompactingFileWriter}.
+ * with a {@link Writer}.
  */
 @PublicEvolving
 public class RecordWiseFileCompactor<IN> implements FileCompactor {
@@ -39,26 +37,28 @@ public class RecordWiseFileCompactor<IN> implements 
FileCompactor {
         this.readerFactory = readerFactory;
     }
 
-    @Override
-    public final CompactingFileWriter.Type getWriterType() {
-        return CompactingFileWriter.Type.RECORD_WISE;
-    }
-
-    @Override
-    public void compact(List<Path> inputFiles, CompactingFileWriter writer) 
throws Exception {
-        RecordWiseCompactingFileWriter<IN> recordWriter =
-                (RecordWiseCompactingFileWriter<IN>) writer;
+    public void compact(List<Path> inputFiles, Writer<IN> writer) throws 
Exception {
         for (Path input : inputFiles) {
             try (Reader<IN> reader = readerFactory.createFor(input)) {
                 IN elem;
                 while ((elem = reader.read()) != null) {
-                    recordWriter.write(elem);
+                    writer.write(elem);
                 }
             }
         }
     }
 
     /**
+     * The writer that writers record into the compacting files.
+     *
+     * @param <T> Thy type of the records that is read.
+     */
+    @PublicEvolving
+    public interface Writer<T> {
+        void write(T record) throws IOException;
+    }
+
+    /**
      * The reader that reads record from the compacting files.
      *
      * @param <T> Thy type of the records that is read.
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
index 7e5c7f6..7ab5e93 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java
@@ -21,11 +21,16 @@ package 
org.apache.flink.connector.file.sink.compactor.operator;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import 
org.apache.flink.connector.file.sink.compactor.OutputStreamBasedFileCompactor;
+import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter.Type;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import java.io.IOException;
@@ -44,6 +49,7 @@ public class CompactService {
 
     private final int numCompactThreads;
     private final FileCompactor fileCompactor;
+    private final CompactingFileWriter.Type compactingWriterType;
     private final BucketWriter<?, String> bucketWriter;
 
     private transient ExecutorService compactService;
@@ -55,6 +61,7 @@ public class CompactService {
         this.numCompactThreads = numCompactThreads;
         this.fileCompactor = fileCompactor;
         this.bucketWriter = bucketWriter;
+        this.compactingWriterType = getWriterType(fileCompactor);
     }
 
     public void open() {
@@ -84,10 +91,11 @@ public class CompactService {
         }
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
     private Iterable<FileSinkCommittable> compact(CompactorRequest request) 
throws Exception {
         List<FileSinkCommittable> results = new 
ArrayList<>(request.getCommittableToPassthrough());
 
-        List<Path> compactingFiles = getCompactingPath(request, results);
+        List<Path> compactingFiles = getCompactingPath(request);
         if (compactingFiles.isEmpty()) {
             return results;
         }
@@ -95,11 +103,22 @@ public class CompactService {
         Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
         CompactingFileWriter compactingFileWriter =
                 bucketWriter.openNewCompactingFile(
-                        fileCompactor.getWriterType(),
+                        compactingWriterType,
                         request.getBucketId(),
                         targetPath,
                         System.currentTimeMillis());
-        fileCompactor.compact(compactingFiles, compactingFileWriter);
+        if (compactingWriterType == Type.RECORD_WISE) {
+            ((RecordWiseFileCompactor) fileCompactor)
+                    .compact(
+                            compactingFiles,
+                            ((RecordWiseCompactingFileWriter) 
compactingFileWriter)::write);
+        } else if (compactingWriterType == 
CompactingFileWriter.Type.OUTPUT_STREAM) {
+            ((OutputStreamBasedFileCompactor) fileCompactor)
+                    .compact(
+                            compactingFiles,
+                            ((OutputStreamBasedCompactingFileWriter) 
compactingFileWriter)
+                                    .asOutputStream());
+        }
         PendingFileRecoverable compactedPendingFile = 
compactingFileWriter.closeForCommit();
 
         FileSinkCommittable compacted =
@@ -113,9 +132,7 @@ public class CompactService {
         return results;
     }
 
-    // results: side output pass through committable
-    private List<Path> getCompactingPath(
-            CompactorRequest request, List<FileSinkCommittable> results) 
throws IOException {
+    private List<Path> getCompactingPath(CompactorRequest request) throws 
IOException {
         List<FileSinkCommittable> compactingCommittable = 
request.getCommittableToCompact();
         List<Path> compactingFiles = new ArrayList<>();
 
@@ -144,4 +161,16 @@ public class CompactService {
         }
         return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);
     }
+
+    private static CompactingFileWriter.Type getWriterType(FileCompactor 
fileCompactor) {
+        if (fileCompactor instanceof OutputStreamBasedFileCompactor) {
+            return CompactingFileWriter.Type.OUTPUT_STREAM;
+        } else if (fileCompactor instanceof RecordWiseFileCompactor) {
+            return CompactingFileWriter.Type.RECORD_WISE;
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unable to crate compacting file writer for compactor:"
+                            + fileCompactor.getClass());
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
index 23033aa..4c7716d 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
 
 import java.io.IOException;
@@ -31,7 +31,7 @@ import java.io.IOException;
  * both. If an class implements both interfaces, once the write method of 
either interface is
  * called, the write method in the other one should be disabled.
  */
-@PublicEvolving
+@Internal
 public interface CompactingFileWriter {
 
     /**
@@ -44,7 +44,7 @@ public interface CompactingFileWriter {
     PendingFileRecoverable closeForCommit() throws IOException;
 
     /** Enum defining the types of {@link CompactingFileWriter}. */
-    @PublicEvolving
+    @Internal
     enum Type {
         RECORD_WISE,
         OUTPUT_STREAM
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
index dbc8159..8e81f31 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.fs.Path;
 
 import javax.annotation.Nullable;
@@ -67,7 +66,6 @@ public interface InProgressFileWriter<IN, BucketID>
     interface InProgressFileRecoverable extends PendingFileRecoverable {}
 
     /** The handle can be used to recover pending file. */
-    @PublicEvolving
     interface PendingFileRecoverable {
 
         /** @return The target path of the pending file, null if unavailable. 
*/

Reply via email to