This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 59055b55a0d384b92b7beb09f96c4ba08e9920b7
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Oct 25 21:47:16 2020 +0100

    [FLINK-19803][connector files] Make PendingSplitCheckpoint and its 
Serializer generic to support sub-classes of FileSourceSplit
---
 .../flink/connector/file/src/FileSource.java       | 14 ++++----
 .../file/src/PendingSplitsCheckpoint.java          | 31 ++++++++++--------
 .../src/PendingSplitsCheckpointSerializer.java     | 38 +++++++++++++---------
 .../src/impl/ContinuousFileSplitEnumerator.java    |  4 +--
 .../file/src/impl/StaticFileSplitEnumerator.java   |  4 +--
 .../src/PendingSplitsCheckpointSerializerTest.java | 33 +++++++++++--------
 6 files changed, 69 insertions(+), 55 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
index 6580742..ccfe8b0 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java
@@ -107,7 +107,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type of the events/records produced by this source.
  */
 @PublicEvolving
-public final class FileSource<T> implements Source<T, FileSourceSplit, 
PendingSplitsCheckpoint>, ResultTypeQueryable<T> {
+public final class FileSource<T> implements Source<T, FileSourceSplit, 
PendingSplitsCheckpoint<FileSourceSplit>>, ResultTypeQueryable<T> {
 
        private static final long serialVersionUID = 1L;
 
@@ -177,7 +177,7 @@ public final class FileSource<T> implements Source<T, 
FileSourceSplit, PendingSp
        }
 
        @Override
-       public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> 
createEnumerator(
+       public SplitEnumerator<FileSourceSplit, 
PendingSplitsCheckpoint<FileSourceSplit>> createEnumerator(
                        SplitEnumeratorContext<FileSourceSplit> enumContext) {
 
                final FileEnumerator enumerator = enumeratorFactory.create();
@@ -195,9 +195,9 @@ public final class FileSource<T> implements Source<T, 
FileSourceSplit, PendingSp
        }
 
        @Override
-       public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> 
restoreEnumerator(
+       public SplitEnumerator<FileSourceSplit, 
PendingSplitsCheckpoint<FileSourceSplit>> restoreEnumerator(
                        SplitEnumeratorContext<FileSourceSplit> enumContext,
-                       PendingSplitsCheckpoint checkpoint) throws IOException {
+                       PendingSplitsCheckpoint<FileSourceSplit> checkpoint) 
throws IOException {
 
                final FileEnumerator enumerator = enumeratorFactory.create();
 
@@ -210,8 +210,8 @@ public final class FileSource<T> implements Source<T, 
FileSourceSplit, PendingSp
        }
 
        @Override
-       public SimpleVersionedSerializer<PendingSplitsCheckpoint> 
getEnumeratorCheckpointSerializer() {
-               return PendingSplitsCheckpointSerializer.INSTANCE;
+       public 
SimpleVersionedSerializer<PendingSplitsCheckpoint<FileSourceSplit>> 
getEnumeratorCheckpointSerializer() {
+               return new 
PendingSplitsCheckpointSerializer<>(getSplitSerializer());
        }
 
        @Override
@@ -223,7 +223,7 @@ public final class FileSource<T> implements Source<T, 
FileSourceSplit, PendingSp
        //  helpers
        // 
------------------------------------------------------------------------
 
-       private SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> 
createSplitEnumerator(
+       private SplitEnumerator<FileSourceSplit, 
PendingSplitsCheckpoint<FileSourceSplit>> createSplitEnumerator(
                        SplitEnumeratorContext<FileSourceSplit> context,
                        FileEnumerator enumerator,
                        Collection<FileSourceSplit> splits,
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
index 1515690..847f99e 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
@@ -33,10 +33,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A checkpoint of the current state of the containing the currently pending 
splits that are not yet assigned.
  */
 @PublicEvolving
-public final class PendingSplitsCheckpoint {
+public final class PendingSplitsCheckpoint<SplitT extends FileSourceSplit> {
 
        /** The splits in the checkpoint. */
-       private final Collection<FileSourceSplit> splits;
+       private final Collection<SplitT> splits;
 
        /** The paths that are no longer in the enumerator checkpoint, but have 
been processed
         * before and should this be ignored. Relevant only for sources in 
continuous monitoring mode. */
@@ -48,14 +48,14 @@ public final class PendingSplitsCheckpoint {
        @Nullable
        byte[] serializedFormCache;
 
-       private PendingSplitsCheckpoint(Collection<FileSourceSplit> splits, 
Collection<Path> alreadyProcessedPaths) {
+       private PendingSplitsCheckpoint(Collection<SplitT> splits, 
Collection<Path> alreadyProcessedPaths) {
                this.splits = Collections.unmodifiableCollection(splits);
                this.alreadyProcessedPaths = 
Collections.unmodifiableCollection(alreadyProcessedPaths);
        }
 
        // 
------------------------------------------------------------------------
 
-       public Collection<FileSourceSplit> getSplits() {
+       public Collection<SplitT> getSplits() {
                return splits;
        }
 
@@ -67,27 +67,30 @@ public final class PendingSplitsCheckpoint {
        //  factories
        // 
------------------------------------------------------------------------
 
-       public static PendingSplitsCheckpoint 
fromCollectionSnapshot(Collection<FileSourceSplit> splits) {
+       public static <T extends FileSourceSplit> PendingSplitsCheckpoint<T> 
fromCollectionSnapshot(
+                       final Collection<T> splits) {
                checkNotNull(splits);
 
                // create a copy of the collection to make sure this checkpoint 
is immutable
-               final Collection<FileSourceSplit> copy = new 
ArrayList<>(splits);
-               return new PendingSplitsCheckpoint(copy, 
Collections.emptySet());
+               final Collection<T> copy = new ArrayList<>(splits);
+               return new PendingSplitsCheckpoint<>(copy, 
Collections.emptySet());
        }
 
-       public static PendingSplitsCheckpoint fromCollectionSnapshot(
-                       Collection<FileSourceSplit> splits,
-                       Collection<Path> alreadyProcessedPaths) {
+       public static <T extends FileSourceSplit> PendingSplitsCheckpoint<T> 
fromCollectionSnapshot(
+                       final Collection<T> splits,
+                       final Collection<Path> alreadyProcessedPaths) {
                checkNotNull(splits);
 
                // create a copy of the collection to make sure this checkpoint 
is immutable
-               final Collection<FileSourceSplit> splitsCopy = new 
ArrayList<>(splits);
+               final Collection<T> splitsCopy = new ArrayList<>(splits);
                final Collection<Path> pathsCopy = new 
ArrayList<>(alreadyProcessedPaths);
 
-               return new PendingSplitsCheckpoint(splitsCopy, pathsCopy);
+               return new PendingSplitsCheckpoint<>(splitsCopy, pathsCopy);
        }
 
-       static PendingSplitsCheckpoint 
reusingCollection(Collection<FileSourceSplit> splits, Collection<Path> 
alreadyProcessedPaths) {
-               return new PendingSplitsCheckpoint(splits, 
alreadyProcessedPaths);
+       static <T extends FileSourceSplit> PendingSplitsCheckpoint<T> 
reusingCollection(
+                       final Collection<T> splits,
+                       final Collection<Path> alreadyProcessedPaths) {
+               return new PendingSplitsCheckpoint<>(splits, 
alreadyProcessedPaths);
        }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
index e87ca7d..9721a53 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializer.java
@@ -30,19 +30,25 @@ import java.util.ArrayList;
 import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A serializer for the {@link PendingSplitsCheckpoint}.
  */
 @PublicEvolving
-public final class PendingSplitsCheckpointSerializer implements 
SimpleVersionedSerializer<PendingSplitsCheckpoint> {
-
-       public static final PendingSplitsCheckpointSerializer INSTANCE = new 
PendingSplitsCheckpointSerializer();
+public final class PendingSplitsCheckpointSerializer<T extends FileSourceSplit>
+               implements 
SimpleVersionedSerializer<PendingSplitsCheckpoint<T>> {
 
        private static final int VERSION = 1;
 
        private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
 
+       private final SimpleVersionedSerializer<T> splitSerializer;
+
+       public PendingSplitsCheckpointSerializer(SimpleVersionedSerializer<T> 
splitSerializer) {
+               this.splitSerializer = checkNotNull(splitSerializer);
+       }
+
        // 
------------------------------------------------------------------------
 
        @Override
@@ -51,7 +57,7 @@ public final class PendingSplitsCheckpointSerializer 
implements SimpleVersionedS
        }
 
        @Override
-       public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws 
IOException {
+       public byte[] serialize(PendingSplitsCheckpoint<T> checkpoint) throws 
IOException {
                checkArgument(checkpoint.getClass() == 
PendingSplitsCheckpoint.class,
                                "Cannot serialize subclasses of 
PendingSplitsCheckpoint");
 
@@ -60,17 +66,17 @@ public final class PendingSplitsCheckpointSerializer 
implements SimpleVersionedS
                        return checkpoint.serializedFormCache;
                }
 
-               final FileSourceSplitSerializer serializer = 
FileSourceSplitSerializer.INSTANCE;
-               final Collection<FileSourceSplit> splits = 
checkpoint.getSplits();
+               final SimpleVersionedSerializer<T> splitSerializer = 
this.splitSerializer; // stack cache
+               final Collection<T> splits = checkpoint.getSplits();
                final Collection<Path> processedPaths = 
checkpoint.getAlreadyProcessedPaths();
 
                final ArrayList<byte[]> serializedSplits = new 
ArrayList<>(splits.size());
                final ArrayList<byte[]> serializedPaths = new 
ArrayList<>(processedPaths.size());
 
-               int totalLen = 16; // four ints: magic, version, count splits, 
count paths
+               int totalLen = 16;      // four ints: magic, version of split 
serializer, count splits, count paths
 
-               for (FileSourceSplit split : splits) {
-                       final byte[] serSplit = serializer.serialize(split);
+               for (T split : splits) {
+                       final byte[] serSplit = 
splitSerializer.serialize(split);
                        serializedSplits.add(serSplit);
                        totalLen += serSplit.length + 4; // 4 bytes for the 
length field
                }
@@ -84,7 +90,7 @@ public final class PendingSplitsCheckpointSerializer 
implements SimpleVersionedS
                final byte[] result = new byte[totalLen];
                final ByteBuffer byteBuffer = 
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
                byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
-               byteBuffer.putInt(serializer.getVersion());
+               byteBuffer.putInt(splitSerializer.getVersion());
                byteBuffer.putInt(serializedSplits.size());
                byteBuffer.putInt(serializedPaths.size());
 
@@ -107,14 +113,14 @@ public final class PendingSplitsCheckpointSerializer 
implements SimpleVersionedS
        }
 
        @Override
-       public PendingSplitsCheckpoint deserialize(int version, byte[] 
serialized) throws IOException {
+       public PendingSplitsCheckpoint<T> deserialize(int version, byte[] 
serialized) throws IOException {
                if (version == 1) {
                        return deserializeV1(serialized);
                }
                throw new IOException("Unknown version: " + version);
        }
 
-       private static PendingSplitsCheckpoint deserializeV1(byte[] serialized) 
throws IOException {
+       private PendingSplitsCheckpoint<T> deserializeV1(byte[] serialized) 
throws IOException {
                final ByteBuffer bb = 
ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
 
                final int magic = bb.getInt();
@@ -123,18 +129,18 @@ public final class PendingSplitsCheckpointSerializer 
implements SimpleVersionedS
                                "Expected: %X , found %X", 
VERSION_1_MAGIC_NUMBER, magic));
                }
 
-               final int version = bb.getInt();
+               final int splitSerializerVersion = bb.getInt();
                final int numSplits = bb.getInt();
                final int numPaths = bb.getInt();
 
-               final FileSourceSplitSerializer serializer = 
FileSourceSplitSerializer.INSTANCE;
-               final ArrayList<FileSourceSplit> splits = new 
ArrayList<>(numSplits);
+               final SimpleVersionedSerializer<T> splitSerializer = 
this.splitSerializer; // stack cache
+               final ArrayList<T> splits = new ArrayList<>(numSplits);
                final ArrayList<Path> paths = new ArrayList<>(numPaths);
 
                for (int remaining = numSplits; remaining > 0; remaining--) {
                        final byte[] bytes = new byte[bb.getInt()];
                        bb.get(bytes);
-                       final FileSourceSplit split = 
serializer.deserialize(version, bytes);
+                       final T split = 
splitSerializer.deserialize(splitSerializerVersion, bytes);
                        splits.add(split);
                }
 
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
index c6a6346..59ccece 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
@@ -49,7 +49,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A continuously monitoring enumerator.
  */
 @Internal
-public class ContinuousFileSplitEnumerator implements 
SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+public class ContinuousFileSplitEnumerator implements 
SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
 
@@ -123,7 +123,7 @@ public class ContinuousFileSplitEnumerator implements 
SplitEnumerator<FileSource
        }
 
        @Override
-       public PendingSplitsCheckpoint snapshotState() throws Exception {
+       public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() throws 
Exception {
                return 
PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits(), 
pathsAlreadyProcessed);
        }
 
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
index 15c20f1..f194dc4 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -52,7 +52,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * {@link FileEnumerator} and in {@link FileSplitAssigner}, respectively.
  */
 @Internal
-public class StaticFileSplitEnumerator implements 
SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
+public class StaticFileSplitEnumerator implements 
SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StaticFileSplitEnumerator.class);
 
@@ -102,7 +102,7 @@ public class StaticFileSplitEnumerator implements 
SplitEnumerator<FileSourceSpli
        }
 
        @Override
-       public PendingSplitsCheckpoint snapshotState() {
+       public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() {
                return 
PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
        }
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
index bc89dce..092c21c 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/PendingSplitsCheckpointSerializerTest.java
@@ -41,54 +41,54 @@ public class PendingSplitsCheckpointSerializerTest {
 
        @Test
        public void serializeEmptyCheckpoint() throws Exception {
-               final PendingSplitsCheckpoint checkpoint =
+               final PendingSplitsCheckpoint<FileSourceSplit> checkpoint =
                                
PendingSplitsCheckpoint.fromCollectionSnapshot(Collections.emptyList());
 
-               final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
+               final PendingSplitsCheckpoint<FileSourceSplit> deSerialized = 
serializeAndDeserialize(checkpoint);
 
                assertCheckpointsEqual(checkpoint, deSerialized);
        }
 
        @Test
        public void serializeSomeSplits() throws Exception {
-               final PendingSplitsCheckpoint checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+               final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
                                Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()));
 
-               final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
+               final PendingSplitsCheckpoint<FileSourceSplit> deSerialized = 
serializeAndDeserialize(checkpoint);
 
                assertCheckpointsEqual(checkpoint, deSerialized);
        }
 
        @Test
        public void serializeSplitsAndProcessedPaths() throws Exception {
-               final PendingSplitsCheckpoint checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+               final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
                                Arrays.asList(testSplit1(), testSplit2(), 
testSplit3()),
                                Arrays.asList(new Path("file:/some/path"), new 
Path("s3://bucket/key/and/path"), new Path("hdfs://namenode:12345/path")));
 
-               final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
+               final PendingSplitsCheckpoint<FileSourceSplit> deSerialized = 
serializeAndDeserialize(checkpoint);
 
                assertCheckpointsEqual(checkpoint, deSerialized);
        }
 
        @Test
        public void repeatedSerialization() throws Exception {
-               final PendingSplitsCheckpoint checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+               final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
                        Arrays.asList(testSplit3(), testSplit1()));
 
                serializeAndDeserialize(checkpoint);
                serializeAndDeserialize(checkpoint);
-               final PendingSplitsCheckpoint deSerialized = 
serializeAndDeserialize(checkpoint);
+               final PendingSplitsCheckpoint<FileSourceSplit> deSerialized = 
serializeAndDeserialize(checkpoint);
 
                assertCheckpointsEqual(checkpoint, deSerialized);
        }
 
        @Test
        public void repeatedSerializationCaches() throws Exception {
-               final PendingSplitsCheckpoint checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+               final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
                                Collections.singletonList(testSplit2()));
 
-               final byte[] ser1 = 
PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
-               final byte[] ser2 = 
PendingSplitsCheckpointSerializer.INSTANCE.serialize(checkpoint);
+               final byte[] ser1 = new 
PendingSplitsCheckpointSerializer<>(FileSourceSplitSerializer.INSTANCE).serialize(checkpoint);
+               final byte[] ser2 = new 
PendingSplitsCheckpointSerializer<>(FileSourceSplitSerializer.INSTANCE).serialize(checkpoint);
 
                assertSame(ser1, ser2);
        }
@@ -122,13 +122,18 @@ public class PendingSplitsCheckpointSerializerTest {
                                1234567);
        }
 
-       private static PendingSplitsCheckpoint 
serializeAndDeserialize(PendingSplitsCheckpoint split) throws IOException {
-               final PendingSplitsCheckpointSerializer serializer = new 
PendingSplitsCheckpointSerializer();
+       private static PendingSplitsCheckpoint<FileSourceSplit> 
serializeAndDeserialize(
+                       final PendingSplitsCheckpoint<FileSourceSplit> split) 
throws IOException {
+
+               final PendingSplitsCheckpointSerializer<FileSourceSplit> 
serializer =
+                               new 
PendingSplitsCheckpointSerializer<>(FileSourceSplitSerializer.INSTANCE);
                final byte[] bytes = 
SimpleVersionedSerialization.writeVersionAndSerialize(serializer, split);
                return 
SimpleVersionedSerialization.readVersionAndDeSerialize(serializer, bytes);
        }
 
-       private static void assertCheckpointsEqual(PendingSplitsCheckpoint 
expected, PendingSplitsCheckpoint actual) {
+       private static void assertCheckpointsEqual(
+                       final PendingSplitsCheckpoint<FileSourceSplit> expected,
+                       final PendingSplitsCheckpoint<FileSourceSplit> actual) {
                assertOrderedCollectionEquals(expected.getSplits(), 
actual.getSplits(), FileSourceSplitSerializerTest::assertSplitsEqual);
 
                assertOrderedCollectionEquals(

Reply via email to