This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d036c23c0e5 [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch. d036c23c0e5 is described below commit d036c23c0e5c079eaafef250a5a14b7f3eead8f1 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Jun 7 11:58:44 2022 +0800 [FLINK-27910][connector/filesystem] Register the timer to enforce rolling policy when file sink starts from scratch. This closes #19889. --- flink-connectors/flink-connector-files/pom.xml | 8 ++++ .../apache/flink/connector/file/sink/FileSink.java | 4 +- .../flink/connector/file/sink/FileSinkTest.java | 46 ++++++++++++++++++++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-files/pom.xml b/flink-connectors/flink-connector-files/pom.xml index 6ea044c6b80..96c507096a9 100644 --- a/flink-connectors/flink-connector-files/pom.xml +++ b/flink-connectors/flink-connector-files/pom.xml @@ -89,6 +89,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-test-utils</artifactId> diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java index 8fdf987b11f..236582767e3 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java @@ -140,11 +140,11 @@ public class FileSink<IN> @Override public FileWriter<IN> createWriter(InitContext context) throws IOException { - return bucketsBuilder.createWriter(context); + return restoreWriter(context, Collections.emptyList()); } @Override - public StatefulSinkWriter<IN, FileWriterBucketState> restoreWriter( + public FileWriter<IN> restoreWriter( InitContext context, Collection<FileWriterBucketState> recoveredState) throws IOException { FileWriter<IN> writer = bucketsBuilder.createWriter(context); diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkTest.java new file mode 100644 index 00000000000..516a071dfb7 --- /dev/null +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink; + +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; +import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils; +import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy; +import org.apache.flink.core.fs.Path; + +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link FileSink}. */ +public class FileSinkTest { + + @Test + public void testCreateFileWriterWithTimerRegistered() throws IOException { + TestSinkInitContext ctx = new TestSinkInitContext(); + FileSink<Integer> sink = + FileSink.forRowFormat( + new Path("mock"), new IntegerFileSinkTestDataUtils.IntEncoder()) + .withRollingPolicy(new PartSizeAndCheckpointRollingPolicy<>(1024, true)) + .build(); + sink.createWriter(ctx); + assertEquals(ctx.getTestProcessingTimeService().getNumActiveTimers(), 1); + } +}