This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3c99b00 [FLINK-26235][connectors/filesystem] CompactingFileWriter and PendingFileRecoverable should not be exposed to users. 3c99b00 is described below commit 3c99b005563debf35f43df5fb4ec92863e7775a9 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Fri Feb 18 11:54:56 2022 +0800 [FLINK-26235][connectors/filesystem] CompactingFileWriter and PendingFileRecoverable should not be exposed to users. This closes #18827. --- .../docs/connectors/datastream/filesystem.md | 10 +++--- .../docs/connectors/datastream/filesystem.md | 12 ++++--- .../7602816f-5c01-4b7a-9e3e-235dfedec245 | 3 +- .../file/sink/compactor/FileCompactor.java | 23 +++--------- .../sink/compactor/IdenticalFileCompactor.java | 7 ++-- .../compactor/OutputStreamBasedFileCompactor.java | 22 ++++-------- .../sink/compactor/RecordWiseFileCompactor.java | 26 +++++++------- .../sink/compactor/operator/CompactService.java | 41 ++++++++++++++++++---- .../sink/filesystem/CompactingFileWriter.java | 6 ++-- .../sink/filesystem/InProgressFileWriter.java | 2 -- 10 files changed, 80 insertions(+), 72 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/filesystem.md b/docs/content.zh/docs/connectors/datastream/filesystem.md index e0e391c..31542da 100644 --- a/docs/content.zh/docs/connectors/datastream/filesystem.md +++ b/docs/content.zh/docs/connectors/datastream/filesystem.md @@ -1013,11 +1013,13 @@ val fileSink: FileSink[Integer] = 目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量。当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint 次数已经达到指定次数时, `FileSink` 将创建一个异步任务来合并当前缓存的文件。 -{{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}} 指定如何将给定的路径列表对应的文件进行合并将结果写入 -到 {{< javadoc file="org/apache/flink/streaming/api/functions/sink/filesystem//CompactingFileWriter.html" name="CompactingFileWriter">}} 中。根据所给定的 `CompactingFileWriter` 的类型,它可以分为两类: +{{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}} 指定如何将给定的路径列表对应的文件进行合并将结果写入到文件中。 +根据如何写文件,它可以分为两类: -- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html" name="OutputStreamBasedFileCompactor">}}** : 这种类型的 `CompactingFileWriter` 可以被转换为一个输出流,用户可以将合并后的结果直接写入该流中。这种类型的 `CompactingFileWriter` 的一个例子是 {{< javadoc file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" name="ConcatFileCompactor">}},它直接将给定的文件进行合并并将结果写到输出流中。 -- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}}** :这种类型的 `CompactingFileWriter` 允许用户将按条写入记录。`CompactingFileWriter` 的一个例子是 {{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}} ,它从给定的文件中读出记录并写出到 `CompactingFileWriter` 中。用户需要指定如何从原始文件中读出记录。 +- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html" name="OutputStreamBasedFileCompactor">}}** : + 用户将合并后的结果写入一个输出流中。通常在用户不希望或者无法从输入文件中读取记录时使用。这种类型的 `CompactingFileWriter` 的一个例子是 {{< javadoc file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" name="ConcatFileCompactor">}},它直接将给定的文件进行合并并将结果写到输出流中。 +- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}}** : + 这种类型的 `CompactingFileWriter` 会逐条读出输入文件的记录用户,然后和`FileWriter`一样写入输出文件中。`CompactingFileWriter` 的一个例子是 {{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}} ,它从给定的文件中读出记录并写出到 `CompactingFileWriter` 中。用户需要指定如何从原始文件中读出记录。 {{< hint info >}} **重要** 如果启用了文件合并功能,文件可见的时间会被延长。 diff --git a/docs/content/docs/connectors/datastream/filesystem.md b/docs/content/docs/connectors/datastream/filesystem.md index 7b0436c..85224c1 100644 --- a/docs/content/docs/connectors/datastream/filesystem.md +++ b/docs/content/docs/connectors/datastream/filesystem.md @@ -1016,10 +1016,14 @@ Once the total size of the cached files has reached the size threshold or the nu the cached files will be scheduled to compact. The {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}} specifies how to compact -the give list of `Path` and write the result to {{< javadoc file="org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.html" name="CompactingFileWriter">}}. It could be classified into two types according to the type of the give `CompactingFileWriter`: - -- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html" name="OutputStreamBasedFileCompactor">}}**: The given `CompactingFileWriter` could be converted into an output stream that users could write the compacted results into. An example is the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" name="ConcatFileCompactor">}} that concats the list of files directly. -- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}}**: The given `CompactingFileWriter` allows users to write records one-by-one into it. An example is the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}} that reads records from the source files and then writes them with the `CompactingFileWriter`. Users need to specify how to re [...] +the give list of `Path` and write the result file. It could be classified into two types according to how to write the file: + +- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html" name="OutputStreamBasedFileCompactor">}}**: + The users can write the compacted results into an output stream. This is useful when the users don't want to or can't read records from the input files. + An example is the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" name="ConcatFileCompactor">}} that concats the list of files directly. +- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}}**: + The compactor can read records one-by-one from the input files and write into the result file similar to the `FileWriter`. + An example is the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}} that reads records from the source files and then writes them with the `CompactingFileWriter`. Users need to specify how to read records from the source files. {{< hint info >}} **Important** Once the compaction is enabled, the written files need to wait for longer time before they get visible. diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 index f61d7d9..ae9b43c 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 @@ -225,6 +225,7 @@ org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated +org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated @@ -310,4 +311,4 @@ org.apache.flink.streaming.api.windowing.triggers.TriggerResult does not satisfy org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.windowing.windows.GlobalWindow$Serializer$GlobalWindowSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated -org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated \ No newline at end of file +org.apache.flink.streaming.api.windowing.windows.TimeWindow$Serializer$TimeWindowSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java index b737fd3..5217fee 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/FileCompactor.java @@ -19,30 +19,15 @@ package org.apache.flink.connector.file.sink.compactor; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; import java.io.Serializable; -import java.util.List; /** * The {@link FileCompactor} is responsible for compacting files into one file. * - * <p>The {@link FileCompactor} should declare which type of {@link CompactingFileWriter} is - * required, and invoke the writer correspondingly. + * <p>Users should never implement the interface directly but use either {@link + * OutputStreamBasedFileCompactor} or {@link RecordWiseFileCompactor}. Other implementations will + * cause UnsupportedOperationException at runtime. */ @PublicEvolving -public interface FileCompactor extends Serializable { - - /** @return the {@link CompactingFileWriter} type the compactor will use. */ - CompactingFileWriter.Type getWriterType(); - - /** - * Compact the given files into one file. - * - * @param inputFiles the files to be compacted. - * @param writer the writer to write the compacted file. - * @throws Exception Thrown if an exception occurs during the compacting. - */ - void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception; -} +public interface FileCompactor extends Serializable {} diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java index 8e6e4a1..923b56c 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/IdenticalFileCompactor.java @@ -20,8 +20,8 @@ package org.apache.flink.connector.file.sink.compactor; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import java.io.OutputStream; import java.util.List; import static org.apache.flink.util.Preconditions.checkState; @@ -37,9 +37,8 @@ public class IdenticalFileCompactor extends ConcatFileCompactor { super(); } - @Override - public void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception { + public void compact(List<Path> inputFiles, OutputStream outputStream) throws Exception { checkState(inputFiles.size() == 1, "IdenticalFileCompactor can only copy one input file"); - super.compact(inputFiles, writer); + super.compact(inputFiles, outputStream); } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java index 2ac3805..96bb902 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.java @@ -20,32 +20,22 @@ package org.apache.flink.connector.file.sink.compactor; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; -import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter; import org.apache.flink.util.CloseShieldOutputStream; import java.io.OutputStream; import java.util.List; /** - * Base class for {@link FileCompactor} implementations that use the {@link - * OutputStreamBasedCompactingFileWriter}. + * Base class for {@link FileCompactor} implementations that write the compacting file by a output + * stream. */ @PublicEvolving public abstract class OutputStreamBasedFileCompactor implements FileCompactor { - @Override - public final CompactingFileWriter.Type getWriterType() { - return CompactingFileWriter.Type.OUTPUT_STREAM; - } - @Override - public void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception { - // The outputStream returned by OutputStreamBasedCompactingFileWriter#asOutputStream should - // not be closed here. - CloseShieldOutputStream outputStream = - new CloseShieldOutputStream( - ((OutputStreamBasedCompactingFileWriter) writer).asOutputStream()); - doCompact(inputFiles, outputStream); + public void compact(List<Path> inputFiles, OutputStream outputStream) throws Exception { + // The outputStream should not be closed here. + CloseShieldOutputStream shieldOutputStream = new CloseShieldOutputStream(outputStream); + doCompact(inputFiles, shieldOutputStream); } protected abstract void doCompact(List<Path> inputFiles, OutputStream outputStream) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java index 3244961..17794cd 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.java @@ -20,8 +20,6 @@ package org.apache.flink.connector.file.sink.compactor; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; -import org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter; import java.io.IOException; import java.io.Serializable; @@ -29,7 +27,7 @@ import java.util.List; /** * A {@link FileCompactor} implementation that reads input files with a {@link Reader} and writes - * with the {@link RecordWiseCompactingFileWriter}. + * with a {@link Writer}. */ @PublicEvolving public class RecordWiseFileCompactor<IN> implements FileCompactor { @@ -39,26 +37,28 @@ public class RecordWiseFileCompactor<IN> implements FileCompactor { this.readerFactory = readerFactory; } - @Override - public final CompactingFileWriter.Type getWriterType() { - return CompactingFileWriter.Type.RECORD_WISE; - } - - @Override - public void compact(List<Path> inputFiles, CompactingFileWriter writer) throws Exception { - RecordWiseCompactingFileWriter<IN> recordWriter = - (RecordWiseCompactingFileWriter<IN>) writer; + public void compact(List<Path> inputFiles, Writer<IN> writer) throws Exception { for (Path input : inputFiles) { try (Reader<IN> reader = readerFactory.createFor(input)) { IN elem; while ((elem = reader.read()) != null) { - recordWriter.write(elem); + writer.write(elem); } } } } /** + * The writer that writers record into the compacting files. + * + * @param <T> Thy type of the records that is read. + */ + @PublicEvolving + public interface Writer<T> { + void write(T record) throws IOException; + } + + /** * The reader that reads record from the compacting files. * * @param <T> Thy type of the records that is read. diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java index 7e5c7f6..7ab5e93 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactService.java @@ -21,11 +21,16 @@ package org.apache.flink.connector.file.sink.compactor.operator; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.file.sink.FileSinkCommittable; import org.apache.flink.connector.file.sink.compactor.FileCompactor; +import org.apache.flink.connector.file.sink.compactor.OutputStreamBasedFileCompactor; +import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter.Type; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.RecordWiseCompactingFileWriter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import java.io.IOException; @@ -44,6 +49,7 @@ public class CompactService { private final int numCompactThreads; private final FileCompactor fileCompactor; + private final CompactingFileWriter.Type compactingWriterType; private final BucketWriter<?, String> bucketWriter; private transient ExecutorService compactService; @@ -55,6 +61,7 @@ public class CompactService { this.numCompactThreads = numCompactThreads; this.fileCompactor = fileCompactor; this.bucketWriter = bucketWriter; + this.compactingWriterType = getWriterType(fileCompactor); } public void open() { @@ -84,10 +91,11 @@ public class CompactService { } } + @SuppressWarnings({"rawtypes", "unchecked"}) private Iterable<FileSinkCommittable> compact(CompactorRequest request) throws Exception { List<FileSinkCommittable> results = new ArrayList<>(request.getCommittableToPassthrough()); - List<Path> compactingFiles = getCompactingPath(request, results); + List<Path> compactingFiles = getCompactingPath(request); if (compactingFiles.isEmpty()) { return results; } @@ -95,11 +103,22 @@ public class CompactService { Path targetPath = assembleCompactedFilePath(compactingFiles.get(0)); CompactingFileWriter compactingFileWriter = bucketWriter.openNewCompactingFile( - fileCompactor.getWriterType(), + compactingWriterType, request.getBucketId(), targetPath, System.currentTimeMillis()); - fileCompactor.compact(compactingFiles, compactingFileWriter); + if (compactingWriterType == Type.RECORD_WISE) { + ((RecordWiseFileCompactor) fileCompactor) + .compact( + compactingFiles, + ((RecordWiseCompactingFileWriter) compactingFileWriter)::write); + } else if (compactingWriterType == CompactingFileWriter.Type.OUTPUT_STREAM) { + ((OutputStreamBasedFileCompactor) fileCompactor) + .compact( + compactingFiles, + ((OutputStreamBasedCompactingFileWriter) compactingFileWriter) + .asOutputStream()); + } PendingFileRecoverable compactedPendingFile = compactingFileWriter.closeForCommit(); FileSinkCommittable compacted = @@ -113,9 +132,7 @@ public class CompactService { return results; } - // results: side output pass through committable - private List<Path> getCompactingPath( - CompactorRequest request, List<FileSinkCommittable> results) throws IOException { + private List<Path> getCompactingPath(CompactorRequest request) throws IOException { List<FileSinkCommittable> compactingCommittable = request.getCommittableToCompact(); List<Path> compactingFiles = new ArrayList<>(); @@ -144,4 +161,16 @@ public class CompactService { } return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + uncompactedName); } + + private static CompactingFileWriter.Type getWriterType(FileCompactor fileCompactor) { + if (fileCompactor instanceof OutputStreamBasedFileCompactor) { + return CompactingFileWriter.Type.OUTPUT_STREAM; + } else if (fileCompactor instanceof RecordWiseFileCompactor) { + return CompactingFileWriter.Type.RECORD_WISE; + } else { + throw new UnsupportedOperationException( + "Unable to crate compacting file writer for compactor:" + + fileCompactor.getClass()); + } + } } diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java index 23033aa..4c7716d 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable; import java.io.IOException; @@ -31,7 +31,7 @@ import java.io.IOException; * both. If an class implements both interfaces, once the write method of either interface is * called, the write method in the other one should be disabled. */ -@PublicEvolving +@Internal public interface CompactingFileWriter { /** @@ -44,7 +44,7 @@ public interface CompactingFileWriter { PendingFileRecoverable closeForCommit() throws IOException; /** Enum defining the types of {@link CompactingFileWriter}. */ - @PublicEvolving + @Internal enum Type { RECORD_WISE, OUTPUT_STREAM diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java index dbc8159..8e81f31 100644 --- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java +++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.fs.Path; import javax.annotation.Nullable; @@ -67,7 +66,6 @@ public interface InProgressFileWriter<IN, BucketID> interface InProgressFileRecoverable extends PendingFileRecoverable {} /** The handle can be used to recover pending file. */ - @PublicEvolving interface PendingFileRecoverable { /** @return The target path of the pending file, null if unavailable. */