This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new c4acec8  [FLINK-13027][fs-connector] SFS bulk-encoded writer supports 
customized checkpoint policy
c4acec8 is described below

commit c4acec8975a03a57713a1cbc8543cbed1d83612d
Author: Ying <y...@lyft.com>
AuthorDate: Fri Dec 20 15:12:09 2019 -0800

    [FLINK-13027][fs-connector] SFS bulk-encoded writer supports customized 
checkpoint policy
---
 .../sink/filesystem/StreamingFileSink.java         | 22 ++++++++++----
 ...ingPolicy.java => CheckpointRollingPolicy.java} | 34 ++++++++++------------
 .../rollingpolicies/OnCheckpointRollingPolicy.java |  9 ++----
 .../api/functions/sink/filesystem/TestUtils.java   |  3 ++
 4 files changed, 38 insertions(+), 30 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index cd4afc2..d8d9d6d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -73,7 +74,7 @@ import java.io.Serializable;
  * {@code "prefix-1-17.ext"} containing the data from {@code subtask 1} of the 
sink and is the {@code 17th} bucket
  * created by that subtask.
  * Part files roll based on the user-specified {@link RollingPolicy}. By 
default, a {@link DefaultRollingPolicy}
- * is used.
+ * is used for row-encoded sink output; a {@link OnCheckpointRollingPolicy} is 
used for bulk-encoded sink output.
  *
  * <p>In some scenarios, the open buckets are required to change based on 
time. In these cases, the user
  * can specify a {@code bucketCheckInterval} (by default 1m) and the sink will 
check periodically and roll
@@ -268,7 +269,7 @@ public class StreamingFileSink<IN>
 
                public <ID> StreamingFileSink.RowFormatBuilder<IN, ID, ? 
extends RowFormatBuilder<IN, ID, ?>> withNewBucketAssignerAndPolicy(final 
BucketAssigner<IN, ID> assigner, final RollingPolicy<IN, ID> policy) {
                        Preconditions.checkState(bucketFactory.getClass() == 
DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot 
be called after specifying a customized bucket factory");
-                       return new RowFormatBuilder<>(basePath, encoder, 
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
+                       return new RowFormatBuilder(basePath, encoder, 
Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), 
bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
                }
 
                /** Creates the actual sink. */
@@ -311,24 +312,29 @@ public class StreamingFileSink<IN>
 
                private BucketAssigner<IN, BucketID> bucketAssigner;
 
+               private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
                private BucketFactory<IN, BucketID> bucketFactory;
 
                private OutputFileConfig outputFileConfig;
 
                protected BulkFormatBuilder(Path basePath, 
BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner) {
-                       this(basePath, writerFactory, assigner, 
DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), 
OutputFileConfig.builder().build());
+                       this(basePath, writerFactory, assigner, 
OnCheckpointRollingPolicy.build(), DEFAULT_BUCKET_CHECK_INTERVAL,
+                               new DefaultBucketFactoryImpl<>(), 
OutputFileConfig.builder().build());
                }
 
                protected BulkFormatBuilder(
                                Path basePath,
                                BulkWriter.Factory<IN> writerFactory,
                                BucketAssigner<IN, BucketID> assigner,
+                               CheckpointRollingPolicy<IN, BucketID> policy,
                                long bucketCheckInterval,
                                BucketFactory<IN, BucketID> bucketFactory,
                                OutputFileConfig outputFileConfig) {
                        this.basePath = Preconditions.checkNotNull(basePath);
                        this.writerFactory = writerFactory;
                        this.bucketAssigner = 
Preconditions.checkNotNull(assigner);
+                       this.rollingPolicy = Preconditions.checkNotNull(policy);
                        this.bucketCheckInterval = bucketCheckInterval;
                        this.bucketFactory = 
Preconditions.checkNotNull(bucketFactory);
                        this.outputFileConfig = 
Preconditions.checkNotNull(outputFileConfig);
@@ -348,6 +354,11 @@ public class StreamingFileSink<IN>
                        return self();
                }
 
+               public T withRollingPolicy(CheckpointRollingPolicy<IN, 
BucketID> rollingPolicy) {
+                       this.rollingPolicy = 
Preconditions.checkNotNull(rollingPolicy);
+                       return self();
+               }
+
                @VisibleForTesting
                T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
                        this.bucketFactory = 
Preconditions.checkNotNull(factory);
@@ -361,7 +372,8 @@ public class StreamingFileSink<IN>
 
                public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID, ? 
extends BulkFormatBuilder<IN, ID, ?>> withNewBucketAssigner(final 
BucketAssigner<IN, ID> assigner) {
                        Preconditions.checkState(bucketFactory.getClass() == 
DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssigner() cannot be 
called after specifying a customized bucket factory");
-                       return new BulkFormatBuilder<>(basePath, writerFactory, 
Preconditions.checkNotNull(assigner), bucketCheckInterval, new 
DefaultBucketFactoryImpl<>(), outputFileConfig);
+                       return new BulkFormatBuilder(basePath, writerFactory, 
Preconditions.checkNotNull(assigner),
+                               rollingPolicy, bucketCheckInterval, new 
DefaultBucketFactoryImpl<>(), outputFileConfig);
                }
 
                /** Creates the actual sink. */
@@ -376,7 +388,7 @@ public class StreamingFileSink<IN>
                                        bucketAssigner,
                                        bucketFactory,
                                        new 
BulkPartWriter.Factory<>(writerFactory),
-                                       OnCheckpointRollingPolicy.build(),
+                                       rollingPolicy,
                                        subtaskIndex,
                                        outputFileConfig);
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java
similarity index 60%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java
index 53fce08..e6399d8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/CheckpointRollingPolicy.java
@@ -22,32 +22,30 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 
+import java.io.IOException;
+
 /**
- * A {@link RollingPolicy} which rolls on every checkpoint.
+ * An abstract {@link RollingPolicy} which rolls on every checkpoint.
  */
 @PublicEvolving
-public class OnCheckpointRollingPolicy<IN, BucketID> implements 
RollingPolicy<IN, BucketID> {
-
-       private static final long serialVersionUID = 1L;
-
-       private OnCheckpointRollingPolicy() {}
-
-       @Override
+public abstract class CheckpointRollingPolicy<IN, BucketID> implements 
RollingPolicy<IN, BucketID> {
        public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> 
partFileState) {
                return true;
        }
 
-       @Override
-       public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, 
IN element) {
-               return false;
-       }
+       public abstract boolean shouldRollOnEvent(final PartFileInfo<BucketID> 
partFileState, IN element) throws IOException;
 
-       @Override
-       public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> 
partFileState, long currentTime) {
-               return false;
-       }
+       public abstract boolean shouldRollOnProcessingTime(final 
PartFileInfo<BucketID> partFileState, final long currentTime) throws 
IOException;
+
+       /**
+        * The base abstract builder class for {@link CheckpointRollingPolicy}.
+        */
+       public abstract static class PolicyBuilder<IN, BucketID, T extends 
PolicyBuilder<IN, BucketID, T>> {
+               @SuppressWarnings("unchecked")
+               protected T self() {
+                       return (T) this;
+               }
 
-       public static <IN, BucketID> OnCheckpointRollingPolicy<IN, BucketID> 
build() {
-               return new OnCheckpointRollingPolicy<>();
+               public abstract CheckpointRollingPolicy<IN, BucketID> build();
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
index 53fce08..572fda9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rollingpolicies/OnCheckpointRollingPolicy.java
@@ -23,21 +23,16 @@ import 
org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 
 /**
- * A {@link RollingPolicy} which rolls on every checkpoint.
+ * A {@link RollingPolicy} which rolls (ONLY) on every checkpoint.
  */
 @PublicEvolving
-public class OnCheckpointRollingPolicy<IN, BucketID> implements 
RollingPolicy<IN, BucketID> {
+public final class OnCheckpointRollingPolicy<IN, BucketID> extends 
CheckpointRollingPolicy<IN, BucketID> {
 
        private static final long serialVersionUID = 1L;
 
        private OnCheckpointRollingPolicy() {}
 
        @Override
-       public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> 
partFileState) {
-               return true;
-       }
-
-       @Override
        public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, 
IN element) {
                return false;
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 45460fc..df005c7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
@@ -157,6 +158,7 @@ public class TestUtils {
                        .forBulkFormat(new Path(outDir.toURI()), writer)
                        .withBucketAssigner(bucketer)
                        .withBucketCheckInterval(bucketCheckInterval)
+                       .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .withBucketFactory(bucketFactory)
                        .withOutputFileConfig(outputFileConfig)
                        .build();
@@ -197,6 +199,7 @@ public class TestUtils {
                StreamingFileSink<Tuple2<String, Integer>> sink = 
StreamingFileSink
                                .forBulkFormat(new Path(outDir.toURI()), writer)
                                .withNewBucketAssigner(bucketer)
+                               
.withRollingPolicy(OnCheckpointRollingPolicy.build())
                                .withBucketCheckInterval(bucketCheckInterval)
                                .withBucketFactory(bucketFactory)
                                .withOutputFileConfig(outputFileConfig)

Reply via email to