This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new c4acec8 [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized checkpoint policy c4acec8 is described below commit c4acec8975a03a57713a1cbc8543cbed1d83612d Author: Ying <y...@lyft.com> AuthorDate: Fri Dec 20 15:12:09 2019 -0800 [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized checkpoint policy --- .../sink/filesystem/StreamingFileSink.java | 22 ++++++++++---- ...ingPolicy.java => CheckpointRollingPolicy.java} | 34 ++++++++++------------ .../rollingpolicies/OnCheckpointRollingPolicy.java | 9 ++---- .../api/functions/sink/filesystem/TestUtils.java | 3 ++ 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index cd4afc2..d8d9d6d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -73,7 +74,7 @@ import java.io.Serializable; * {@code "prefix-1-17.ext"} containing the data from {@code subtask 1} of the sink and is the {@code 17th} bucket * created by that subtask. * Part files roll based on the user-specified {@link RollingPolicy}. By default, a {@link DefaultRollingPolicy} - * is used. + * is used for row-encoded sink output; a {@link OnCheckpointRollingPolicy} is used for bulk-encoded sink output. * * <p>In some scenarios, the open buckets are required to change based on time. In these cases, the user * can specify a {@code bucketCheckInterval} (by default 1m) and the sink will check periodically and roll @@ -268,7 +269,7 @@ public class StreamingFileSink<IN> public <ID> StreamingFileSink.RowFormatBuilder<IN, ID, ? extends RowFormatBuilder<IN, ID, ?>> withNewBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner, final RollingPolicy<IN, ID> policy) { Preconditions.checkState(bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot be called after specifying a customized bucket factory"); - return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig); + return new RowFormatBuilder(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig); } /** Creates the actual sink. */ @@ -311,24 +312,29 @@ public class StreamingFileSink<IN> private BucketAssigner<IN, BucketID> bucketAssigner; + private CheckpointRollingPolicy<IN, BucketID> rollingPolicy; + private BucketFactory<IN, BucketID> bucketFactory; private OutputFileConfig outputFileConfig; protected BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner) { - this(basePath, writerFactory, assigner, DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build()); + this(basePath, writerFactory, assigner, OnCheckpointRollingPolicy.build(), DEFAULT_BUCKET_CHECK_INTERVAL, + new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build()); } protected BulkFormatBuilder( Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner, + CheckpointRollingPolicy<IN, BucketID> policy, long bucketCheckInterval, BucketFactory<IN, BucketID> bucketFactory, OutputFileConfig outputFileConfig) { this.basePath = Preconditions.checkNotNull(basePath); this.writerFactory = writerFactory; this.bucketAssigner = Preconditions.checkNotNull(assigner); + this.rollingPolicy = Preconditions.checkNotNull(policy); this.bucketCheckInterval = bucketCheckInterval; this.bucketFactory = Preconditions.checkNotNull(bucketFactory); this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig); @@ -348,6 +354,11 @@ public class StreamingFileSink<IN> return self(); } + public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) { + this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy); + return self(); + } + @VisibleForTesting T withBucketFactory(final BucketFactory<IN, BucketID> factory) { this.bucketFactory = Preconditions.checkNotNull(factory); @@ -361,7 +372,8 @@ public class StreamingFileSink<IN> public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID, ? extends BulkFormatBuilder<IN, ID, ?>> withNewBucketAssigner(final BucketAssigner<IN, ID> assigner) { Preconditions.checkState(bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssigner() cannot be called after specifying a customized bucket factory"); - return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(assigner), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig); + return new BulkFormatBuilder(basePath, writerFactory, Preconditions.checkNotNull(assigner), + rollingPolicy, bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig); } /** Creates the actual sink. */ @@ -376,7 +388,7 @@ public class StreamingFileSink<IN> bucketAssigner, bucketFactory, new BulkPartWriter.Factory<>(writerFactory), - OnCheckpointRollingPolicy.build(), + rollingPolicy, subtaskIndex, outputFileConfig); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java similarity index 60% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java index 53fce08..e6399d8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java @@ -22,32 +22,30 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; +import java.io.IOException; + /** - * A {@link RollingPolicy} which rolls on every checkpoint. + * An abstract {@link RollingPolicy} which rolls on every checkpoint. */ @PublicEvolving -public class OnCheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> { - - private static final long serialVersionUID = 1L; - - private OnCheckpointRollingPolicy() {} - - @Override +public abstract class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> { public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) { return true; } - @Override - public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) { - return false; - } + public abstract boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element) throws IOException; - @Override - public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) { - return false; - } + public abstract boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) throws IOException; + + /** + * The base abstract builder class for {@link CheckpointRollingPolicy}. + */ + public abstract static class PolicyBuilder<IN, BucketID, T extends PolicyBuilder<IN, BucketID, T>> { + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; + } - public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID> build() { - return new OnCheckpointRollingPolicy<>(); + public abstract CheckpointRollingPolicy<IN, BucketID> build(); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java index 53fce08..572fda9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java @@ -23,21 +23,16 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; /** - * A {@link RollingPolicy} which rolls on every checkpoint. + * A {@link RollingPolicy} which rolls (ONLY) on every checkpoint. */ @PublicEvolving -public class OnCheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> { +public final class OnCheckpointRollingPolicy<IN, BucketID> extends CheckpointRollingPolicy<IN, BucketID> { private static final long serialVersionUID = 1L; private OnCheckpointRollingPolicy() {} @Override - public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) { - return true; - } - - @Override public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) { return false; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java index 45460fc..df005c7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java @@ -27,6 +27,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -157,6 +158,7 @@ public class TestUtils { .forBulkFormat(new Path(outDir.toURI()), writer) .withBucketAssigner(bucketer) .withBucketCheckInterval(bucketCheckInterval) + .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withBucketFactory(bucketFactory) .withOutputFileConfig(outputFileConfig) .build(); @@ -197,6 +199,7 @@ public class TestUtils { StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink .forBulkFormat(new Path(outDir.toURI()), writer) .withNewBucketAssigner(bucketer) + .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withBucketCheckInterval(bucketCheckInterval) .withBucketFactory(bucketFactory) .withOutputFileConfig(outputFileConfig)