This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 890b83ca2c5d64f6707e83952270cfca05159227 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Wed Feb 9 15:23:15 2022 +0800 [FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces This closes #18642. --- .../apache/flink/connector/file/sink/FileSink.java | 51 ++++++++-------- .../file/sink/committer/FileCommitter.java | 14 ++--- .../connector/file/sink/writer/FileWriter.java | 34 +++++++---- .../file/sink/writer/FileWriterBucket.java | 4 +- .../file/sink/committer/FileCommitterTest.java | 55 ++++++++++------- ...leWriterBucketStateSerializerMigrationTest.java | 13 ++-- .../connector/file/sink/writer/FileWriterTest.java | 36 +++++------ .../connector/kafka/sink/KafkaCommitterTest.java | 55 +++-------------- .../connector/sink2/mocks/MockCommitRequest.java | 69 ++++++++++++++++++++++ .../table/planner/factories/TestFileFactory.java | 6 +- 10 files changed, 196 insertions(+), 141 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java index 920a11b..566037f 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java @@ -23,10 +23,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.api.common.serialization.Encoder; -import org.apache.flink.api.connector.sink.Committer; -import org.apache.flink.api.connector.sink.GlobalCommitter; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.connector.file.sink.committer.FileCommitter; import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory; import org.apache.flink.connector.file.sink.writer.FileWriter; @@ -52,8 +52,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.Collection; import java.util.Collections; -import java.util.List; -import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -108,7 +106,10 @@ import static org.apache.flink.util.Preconditions.checkState; * written to its output */ @Experimental -public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBucketState, Void> { +public class FileSink<IN> + implements StatefulSink<IN, FileWriterBucketState>, + TwoPhaseCommittingSink<IN, FileSinkCommittable>, + WithCompatibleState { private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder; @@ -117,17 +118,23 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc } @Override - public SinkWriter<IN, FileSinkCommittable, FileWriterBucketState> createWriter( - InitContext context, List<FileWriterBucketState> states) throws IOException { + public FileWriter<IN> createWriter(InitContext context) throws IOException { + return bucketsBuilder.createWriter(context); + } + + @Override + public StatefulSinkWriter<IN, FileWriterBucketState> restoreWriter( + InitContext context, Collection<FileWriterBucketState> recoveredState) + throws IOException { FileWriter<IN> writer = bucketsBuilder.createWriter(context); - writer.initializeState(states); + writer.initializeState(recoveredState); return writer; } @Override - public Optional<SimpleVersionedSerializer<FileWriterBucketState>> getWriterStateSerializer() { + public SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() { try { - return Optional.of(bucketsBuilder.getWriterStateSerializer()); + return bucketsBuilder.getWriterStateSerializer(); } catch (IOException e) { // it's not optimal that we have to do this but creating the serializers for the // FileSink requires (among other things) a call to FileSystem.get() which declares @@ -137,14 +144,14 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc } @Override - public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException { - return Optional.of(bucketsBuilder.createCommitter()); + public Committer<FileSinkCommittable> createCommitter() throws IOException { + return bucketsBuilder.createCommitter(); } @Override - public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer() { + public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() { try { - return Optional.of(bucketsBuilder.getCommittableSerializer()); + return bucketsBuilder.getCommittableSerializer(); } catch (IOException e) { // it's not optimal that we have to do this but creating the serializers for the // FileSink requires (among other things) a call to FileSystem.get() which declares @@ -154,17 +161,7 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc } @Override - public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() { - return Optional.empty(); - } - - @Override - public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { - return Optional.empty(); - } - - @Override - public Collection<String> getCompatibleStateNames() { + public Collection<String> getCompatibleWriterStateNames() { // StreamingFileSink return Collections.singleton("bucket-states"); } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java index ccd3018..c72b399 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java @@ -19,14 +19,13 @@ package org.apache.flink.connector.file.sink.committer; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.sink.FileSinkCommittable; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; import java.io.IOException; -import java.util.Collections; -import java.util.List; +import java.util.Collection; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -48,9 +47,10 @@ public class FileCommitter implements Committer<FileSinkCommittable> { } @Override - public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables) - throws IOException { - for (FileSinkCommittable committable : committables) { + public void commit(Collection<CommitRequest<FileSinkCommittable>> requests) + throws IOException, InterruptedException { + for (CommitRequest<FileSinkCommittable> request : requests) { + FileSinkCommittable committable = request.getCommittable(); if (committable.hasPendingFile()) { // We should always use commitAfterRecovery which contains additional checks. bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery(); @@ -61,8 +61,6 @@ public class FileCommitter implements Committer<FileSinkCommittable> { committable.getInProgressFileToCleanup()); } } - - return Collections.emptyList(); } @Override diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java index 15aebd0..fad3b50 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java @@ -20,8 +20,10 @@ package org.apache.flink.connector.file.sink.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.common.operators.ProcessingTimeService; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.sink.FileSinkCommittable; import org.apache.flink.core.fs.Path; @@ -39,6 +41,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -58,8 +61,10 @@ import static org.apache.flink.util.Preconditions.checkState; */ @Internal public class FileWriter<IN> - implements SinkWriter<IN, FileSinkCommittable, FileWriterBucketState>, - Sink.ProcessingTimeService.ProcessingTimeCallback { + implements StatefulSinkWriter<IN, FileWriterBucketState>, + TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, FileSinkCommittable>, + SinkWriter<IN>, + ProcessingTimeService.ProcessingTimeCallback { private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); @@ -75,7 +80,7 @@ public class FileWriter<IN> private final RollingPolicy<IN, String> rollingPolicy; - private final Sink.ProcessingTimeService processingTimeService; + private final ProcessingTimeService processingTimeService; private final long bucketCheckInterval; @@ -89,6 +94,8 @@ public class FileWriter<IN> private final Counter recordsOutCounter; + private boolean endOfInput; + /** * A constructor creating a new empty bucket manager. * @@ -107,7 +114,7 @@ public class FileWriter<IN> final BucketWriter<IN, String> bucketWriter, final RollingPolicy<IN, String> rollingPolicy, final OutputFileConfig outputFileConfig, - final Sink.ProcessingTimeService processingTimeService, + final ProcessingTimeService processingTimeService, final long bucketCheckInterval) { this.basePath = checkNotNull(basePath); @@ -148,7 +155,7 @@ public class FileWriter<IN> * @throws IOException if anything goes wrong during retrieving the state or * restoring/committing of any in-progress/pending part files */ - public void initializeState(List<FileWriterBucketState> bucketStates) throws IOException { + public void initializeState(Collection<FileWriterBucketState> bucketStates) throws IOException { checkNotNull(bucketStates, "The retrieved state was null."); for (FileWriterBucketState state : bucketStates) { @@ -179,7 +186,7 @@ public class FileWriter<IN> } @Override - public void write(IN element, Context context) throws IOException { + public void write(IN element, Context context) throws IOException, InterruptedException { // setting the values in the bucketer context bucketerContext.update( context.timestamp(), @@ -193,7 +200,12 @@ public class FileWriter<IN> } @Override - public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException { + public void flush(boolean endOfInput) throws IOException, InterruptedException { + this.endOfInput = endOfInput; + } + + @Override + public Collection<FileSinkCommittable> prepareCommit() throws IOException { List<FileSinkCommittable> committables = new ArrayList<>(); // Every time before we prepare commit, we first check and remove the inactive @@ -206,7 +218,7 @@ public class FileWriter<IN> if (!entry.getValue().isActive()) { activeBucketIt.remove(); } else { - committables.addAll(entry.getValue().prepareCommit(flush)); + committables.addAll(entry.getValue().prepareCommit(endOfInput)); } } @@ -263,7 +275,7 @@ public class FileWriter<IN> private void registerNextBucketInspectionTimer() { final long nextInspectionTime = processingTimeService.getCurrentProcessingTime() + bucketCheckInterval; - processingTimeService.registerProcessingTimer(nextInspectionTime, this); + processingTimeService.registerTimer(nextInspectionTime, this); } /** diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java index cec8648..3f5f5f9 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java @@ -191,9 +191,9 @@ class FileWriterBucket<IN> { inProgressPart.write(element, currentTime); } - List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException { + List<FileSinkCommittable> prepareCommit(boolean endOfInput) throws IOException { if (inProgressPart != null - && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || flush)) { + && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || endOfInput)) { if (LOG.isDebugEnabled()) { LOG.debug( "Closing in-progress part file for bucket id={} on checkpoint.", bucketId); diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java index 935d170..0b07370 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.file.sink.committer; +import org.apache.flink.api.connector.sink2.Committer.CommitRequest; +import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; import org.apache.flink.connector.file.sink.FileSinkCommittable; import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils; import org.apache.flink.connector.file.sink.utils.NoOpBucketWriter; @@ -28,9 +30,11 @@ import org.junit.Test; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -43,15 +47,16 @@ public class FileCommitterTest { StubBucketWriter stubBucketWriter = new StubBucketWriter(); FileCommitter fileCommitter = new FileCommitter(stubBucketWriter); - FileSinkCommittable fileSinkCommittable = - new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()); - List<FileSinkCommittable> toRetry = - fileCommitter.commit(Collections.singletonList(fileSinkCommittable)); + MockCommitRequest<FileSinkCommittable> fileSinkCommittable = + new MockCommitRequest<>( + new FileSinkCommittable( + new FileSinkTestUtils.TestPendingFileRecoverable())); + fileCommitter.commit(Collections.singletonList(fileSinkCommittable)); assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size()); assertEquals(0, stubBucketWriter.getNumCleanUp()); assertTrue(stubBucketWriter.getRecoveredPendingFiles().get(0).isCommitted()); - assertEquals(0, toRetry.size()); + assertEquals(0, fileSinkCommittable.getNumberOfRetries()); } @Test @@ -59,14 +64,15 @@ public class FileCommitterTest { StubBucketWriter stubBucketWriter = new StubBucketWriter(); FileCommitter fileCommitter = new FileCommitter(stubBucketWriter); - FileSinkCommittable fileSinkCommittable = - new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable()); - List<FileSinkCommittable> toRetry = - fileCommitter.commit(Collections.singletonList(fileSinkCommittable)); + MockCommitRequest<FileSinkCommittable> fileSinkCommittable = + new MockCommitRequest<>( + new FileSinkCommittable( + new FileSinkTestUtils.TestInProgressFileRecoverable())); + fileCommitter.commit(Collections.singletonList(fileSinkCommittable)); assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size()); assertEquals(1, stubBucketWriter.getNumCleanUp()); - assertEquals(0, toRetry.size()); + assertEquals(0, fileSinkCommittable.getNumberOfRetries()); } @Test @@ -74,23 +80,28 @@ public class FileCommitterTest { StubBucketWriter stubBucketWriter = new StubBucketWriter(); FileCommitter fileCommitter = new FileCommitter(stubBucketWriter); - List<FileSinkCommittable> committables = - Arrays.asList( - new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()), - new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()), - new FileSinkCommittable( - new FileSinkTestUtils.TestInProgressFileRecoverable()), - new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()), - new FileSinkCommittable( - new FileSinkTestUtils.TestInProgressFileRecoverable())); - List<FileSinkCommittable> toRetry = fileCommitter.commit(committables); + Collection<CommitRequest<FileSinkCommittable>> committables = + Stream.of( + new FileSinkCommittable( + new FileSinkTestUtils.TestPendingFileRecoverable()), + new FileSinkCommittable( + new FileSinkTestUtils.TestPendingFileRecoverable()), + new FileSinkCommittable( + new FileSinkTestUtils.TestInProgressFileRecoverable()), + new FileSinkCommittable( + new FileSinkTestUtils.TestPendingFileRecoverable()), + new FileSinkCommittable( + new FileSinkTestUtils.TestInProgressFileRecoverable())) + .map(MockCommitRequest::new) + .collect(Collectors.toList()); + fileCommitter.commit(committables); assertEquals(3, stubBucketWriter.getRecoveredPendingFiles().size()); assertEquals(2, stubBucketWriter.getNumCleanUp()); stubBucketWriter .getRecoveredPendingFiles() .forEach(pendingFile -> assertTrue(pendingFile.isCommitted())); - assertEquals(0, toRetry.size()); + assertTrue(committables.stream().allMatch(c -> c.getNumberOfRetries() == 0)); } // ------------------------------- Mock Classes -------------------------------- diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java index 65f15f6..228b9c9 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.file.sink.writer; import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.connector.sink2.Committer.CommitRequest; +import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.file.sink.FileSinkCommittable; import org.apache.flink.connector.file.sink.committer.FileCommitter; @@ -169,7 +171,7 @@ public class FileWriterBucketStateSerializerMigrationTest { } @Test - public void testSerializationFull() throws IOException { + public void testSerializationFull() throws IOException, InterruptedException { testDeserializationFull(true, "full"); } @@ -180,12 +182,12 @@ public class FileWriterBucketStateSerializerMigrationTest { } @Test - public void testSerializationNullInProgress() throws IOException { + public void testSerializationNullInProgress() throws IOException, InterruptedException { testDeserializationFull(false, "full-no-in-progress"); } private void testDeserializationFull(final boolean withInProgress, final String scenarioName) - throws IOException { + throws IOException, InterruptedException { final BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, previousVersion); @@ -221,7 +223,10 @@ public class FileWriterBucketStateSerializerMigrationTest { // simulates we commit the recovered pending files on the first checkpoint bucket.snapshotState(); - List<FileSinkCommittable> committables = bucket.prepareCommit(false); + Collection<CommitRequest<FileSinkCommittable>> committables = + bucket.prepareCommit(false).stream() + .map(MockCommitRequest::new) + .collect(Collectors.toList()); FileCommitter committer = new FileCommitter(createBucketWriter()); committer.commit(committables); diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java index a8fe439..e521f61 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java @@ -18,9 +18,9 @@ package org.apache.flink.connector.file.sink.writer; +import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SimpleStringEncoder; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.file.sink.FileSinkCommittable; import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils; @@ -53,12 +53,14 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; +import java.util.concurrent.ScheduledFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -92,7 +94,7 @@ public class FileWriterTest { fileWriter.write("test2", new ContextImpl()); fileWriter.write("test3", new ContextImpl()); - List<FileSinkCommittable> committables = fileWriter.prepareCommit(false); + Collection<FileSinkCommittable> committables = fileWriter.prepareCommit(); assertEquals(3, committables.size()); } @@ -112,7 +114,7 @@ public class FileWriterTest { fileWriter.write("test3", new ContextImpl()); assertEquals(3, fileWriter.getActiveBuckets().size()); - fileWriter.prepareCommit(false); + fileWriter.prepareCommit(); List<FileWriterBucketState> states = fileWriter.snapshotState(1L); assertEquals(3, states.size()); @@ -145,7 +147,7 @@ public class FileWriterTest { firstFileWriter.write("test2", new ContextImpl()); firstFileWriter.write("test3", new ContextImpl()); - firstFileWriter.prepareCommit(false); + firstFileWriter.prepareCommit(); List<FileWriterBucketState> firstState = firstFileWriter.snapshotState(1L); FileWriter<String> secondFileWriter = @@ -157,7 +159,7 @@ public class FileWriterTest { secondFileWriter.write("test1", new ContextImpl()); secondFileWriter.write("test2", new ContextImpl()); - secondFileWriter.prepareCommit(false); + secondFileWriter.prepareCommit(); List<FileWriterBucketState> secondState = secondFileWriter.snapshotState(1L); List<FileWriterBucketState> mergedState = new ArrayList<>(); @@ -197,17 +199,17 @@ public class FileWriterTest { path, OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", "")); fileWriter.write("test", new ContextImpl()); - fileWriter.prepareCommit(false); + fileWriter.prepareCommit(); fileWriter.snapshotState(1L); // No more records and another call to prepareCommit will makes it inactive - fileWriter.prepareCommit(false); + fileWriter.prepareCommit(); assertTrue(fileWriter.getActiveBuckets().isEmpty()); } @Test - public void testOnProcessingTime() throws IOException, InterruptedException { + public void testOnProcessingTime() throws Exception { File outDir = TEMP_FOLDER.newFolder(); Path path = new Path(outDir.toURI()); @@ -247,7 +249,7 @@ public class FileWriterTest { // Close, pre-commit & clear all the pending records. processingTimeService.advanceTo(30); - fileWriter.prepareCommit(false); + fileWriter.prepareCommit(); // Test timer re-registration. fileWriter.write("test1", new ContextImpl()); @@ -278,7 +280,7 @@ public class FileWriterTest { } @Test - public void testNumberRecordsOutCounter() throws IOException { + public void testNumberRecordsOutCounter() throws IOException, InterruptedException { final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); File outDir = TEMP_FOLDER.newFolder(); @@ -350,8 +352,7 @@ public class FileWriterTest { } } - private static class ManuallyTriggeredProcessingTimeService - implements Sink.ProcessingTimeService { + private static class ManuallyTriggeredProcessingTimeService implements ProcessingTimeService { private long now; @@ -364,20 +365,21 @@ public class FileWriterTest { } @Override - public void registerProcessingTimer( + public ScheduledFuture<?> registerTimer( long time, ProcessingTimeCallback processingTimeCallback) { if (time <= now) { try { processingTimeCallback.onProcessingTime(now); - } catch (IOException | InterruptedException e) { + } catch (Exception e) { ExceptionUtils.rethrow(e); } } else { timers.add(new Tuple2<>(time, processingTimeCallback)); } + return null; } - public void advanceTo(long time) throws IOException, InterruptedException { + public void advanceTo(long time) throws Exception { if (time > now) { now = time; @@ -464,7 +466,7 @@ public class FileWriterTest { BucketAssigner<String, String> bucketAssigner, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig, - Sink.ProcessingTimeService processingTimeService, + ProcessingTimeService processingTimeService, long bucketCheckInterval) throws IOException { return new FileWriter<>( diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java index 157699f..30bbdca7 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java @@ -17,7 +17,7 @@ package org.apache.flink.connector.kafka.sink; -import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest; import org.apache.flink.util.TestLoggerExtension; import org.apache.kafka.clients.CommonClientConfigs; @@ -50,8 +50,8 @@ public class KafkaCommitterTest { new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID); Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable = new Recyclable<>(producer, p -> {})) { - final MockCommitRequest request = - new MockCommitRequest( + final MockCommitRequest<KafkaCommittable> request = + new MockCommitRequest<>( new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable)); producer.resumeTransaction(PRODUCER_ID, EPOCH); @@ -73,12 +73,13 @@ public class KafkaCommitterTest { Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable = new Recyclable<>(producer, p -> {})) { // will fail because transaction not started - final MockCommitRequest request = - new MockCommitRequest( + final MockCommitRequest<KafkaCommittable> request = + new MockCommitRequest<>( new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable)); committer.commit(Collections.singletonList(request)); - assertThat(request.failedWithUnknownReason).isInstanceOf(IllegalStateException.class); - assertThat(request.failedWithUnknownReason.getMessage()) + assertThat(request.getFailedWithUnknownReason()) + .isInstanceOf(IllegalStateException.class); + assertThat(request.getFailedWithUnknownReason().getMessage()) .contains("Transaction was not started"); assertThat(recyclable.isRecycled()).isTrue(); } @@ -93,44 +94,4 @@ public class KafkaCommitterTest { properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return properties; } - - private static class MockCommitRequest implements Committer.CommitRequest<KafkaCommittable> { - - private final KafkaCommittable committable; - private int retries = 0; - Throwable failedWithUnknownReason; - - MockCommitRequest(KafkaCommittable committable) { - this.committable = committable; - } - - @Override - public KafkaCommittable getCommittable() { - return committable; - } - - @Override - public int getNumberOfRetries() { - return retries; - } - - @Override - public void signalFailedWithKnownReason(Throwable t) {} - - @Override - public void signalFailedWithUnknownReason(Throwable t) { - failedWithUnknownReason = t; - } - - @Override - public void retryLater() { - retries++; - } - - @Override - public void updateAndRetryLater(KafkaCommittable committable) {} - - @Override - public void signalAlreadyCommitted() {} - } } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java new file mode 100644 index 0000000..75a652a --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.sink2.mocks; + +import org.apache.flink.api.connector.sink2.Committer; + +/** + * A simple {@link Committer.CommitRequest} used for testing. + * + * @param <CommT> committable type + */ +public class MockCommitRequest<CommT> implements Committer.CommitRequest<CommT> { + + private final CommT committable; + private int retries = 0; + private Throwable failedWithUnknownReason; + + public MockCommitRequest(CommT committable) { + this.committable = committable; + } + + @Override + public CommT getCommittable() { + return committable; + } + + @Override + public int getNumberOfRetries() { + return retries; + } + + @Override + public void signalFailedWithKnownReason(Throwable t) {} + + @Override + public void signalFailedWithUnknownReason(Throwable t) { + failedWithUnknownReason = t; + } + + @Override + public void retryLater() { + retries++; + } + + @Override + public void updateAndRetryLater(CommT committable) {} + + @Override + public void signalAlreadyCommitted() {} + + public Throwable getFailedWithUnknownReason() { + return failedWithUnknownReason; + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java index 8949160..626a395 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java @@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; @@ -56,7 +56,7 @@ import java.util.Set; /** * Test implementation of {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory} that - * creates a file source and sink based on {@link SourceProvider} and {@link SinkProvider}. + * creates a file source and sink based on {@link SourceProvider} and {@link SinkV2Provider}. */ public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { @@ -158,7 +158,7 @@ public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableS public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { final FileSink<RowData> fileSink = FileSink.forRowFormat(path, new RowDataEncoder()).build(); - return SinkProvider.of(fileSink); + return SinkV2Provider.of(fileSink); } @Override