This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0f7ff28c4109654e49cd11ab380bf5756e2bc497
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Sep 23 18:10:34 2020 +0200

    [FLINK-19457][core] Add a number sequence generating source for the FLIP-27 
source API.
    
    This closes #13512
---
 .../connector/source/lib/NumberSequenceSource.java | 253 +++++++++++++++++++++
 .../source/lib/util/IteratorSourceEnumerator.java  |  88 +++++++
 .../source/lib/util/IteratorSourceReader.java      | 146 ++++++++++++
 .../source/lib/util/IteratorSourceSplit.java       |  43 ++++
 .../source/lib/util/NoSplitAvailableEvent.java     |  31 +++
 .../source/lib/util/SplitRequestEvent.java         |  31 +++
 .../source/lib/NumberSequenceSourceTest.java       |  91 ++++++++
 .../source/lib/IteratorSourcesITCase.java          |  90 ++++++++
 8 files changed, 773 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
new file mode 100644
index 0000000..154ac7f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces a sequence of numbers (longs).
+ * This source is useful for testing and for cases that just need a stream of 
N events of any kind.
+ *
+ * <p>The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers. Each sub-sequence will be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order.
+ *
+ * <p>This source is always bounded. For very long sequences (for example over 
the entire domain
+ * of long integer values), user may want to consider executing the 
application in a streaming manner,
+ * because, despite the fact that the produced stream is bounded, the end 
bound is pretty far away.
+ */
+public class NumberSequenceSource implements
+               Source<Long, NumberSequenceSource.NumberSequenceSplit, 
Collection<NumberSequenceSource.NumberSequenceSplit>>,
+               ResultTypeQueryable<Long> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The starting number in the sequence, inclusive. */
+       private final long from;
+
+       /** The end number in the sequence, inclusive. */
+       private final long to;
+
+       /**
+        * Creates a new {@code NumberSequenceSource} that produces parallel 
sequences covering the range
+        * {@code from} to {@code to} (both boundaries are inclusive).
+        */
+       public NumberSequenceSource(long from, long to) {
+               checkArgument(from <= to, "'from' must be <= 'to'");
+               this.from = from;
+               this.to = to;
+       }
+
+       @Override
+       public TypeInformation<Long> getProducedType() {
+               return Types.LONG;
+       }
+
+       @Override
+       public Boundedness getBoundedness() {
+               return Boundedness.BOUNDED;
+       }
+
+       @Override
+       public SourceReader<Long, NumberSequenceSplit> 
createReader(SourceReaderContext readerContext) {
+               return new IteratorSourceReader<>(readerContext);
+       }
+
+       @Override
+       public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> createEnumerator(
+                       final SplitEnumeratorContext<NumberSequenceSplit> 
enumContext) {
+
+               final NumberSequenceIterator[] subSequences =
+                               new NumberSequenceIterator(from, 
to).split(enumContext.currentParallelism());
+               final ArrayList<NumberSequenceSplit> splits = new 
ArrayList<>(subSequences.length);
+
+               int splitId = 1;
+               for (NumberSequenceIterator seq : subSequences) {
+                       splits.add(new 
NumberSequenceSplit(String.valueOf(splitId++), seq.getCurrent(), seq.getTo()));
+               }
+
+               return new IteratorSourceEnumerator<>(enumContext, splits);
+       }
+
+       @Override
+       public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> restoreEnumerator(
+                       final SplitEnumeratorContext<NumberSequenceSplit> 
enumContext, Collection<NumberSequenceSplit> checkpoint) {
+               return new IteratorSourceEnumerator<>(enumContext, checkpoint);
+       }
+
+       @Override
+       public SimpleVersionedSerializer<NumberSequenceSplit> 
getSplitSerializer() {
+               return new SplitSerializer();
+       }
+
+       @Override
+       public SimpleVersionedSerializer<Collection<NumberSequenceSplit>> 
getEnumeratorCheckpointSerializer() {
+               return new CheckpointSerializer();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  splits & checkpoint
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A split of the source, representing a number sub-sequence.
+        */
+       public static class NumberSequenceSplit implements 
IteratorSourceSplit<Long, NumberSequenceIterator> {
+
+               private final String splitId;
+               private final long from;
+               private final long to;
+
+               public NumberSequenceSplit(String splitId, long from, long to) {
+                       checkArgument(from <= to, "'from' must be <= 'to'");
+                       this.splitId = checkNotNull(splitId);
+                       this.from = from;
+                       this.to = to;
+               }
+
+               @Override
+               public String splitId() {
+                       return splitId;
+               }
+
+               public long from() {
+                       return from;
+               }
+
+               public long to() {
+                       return to;
+               }
+
+               @Override
+               public NumberSequenceIterator getIterator() {
+                       return new NumberSequenceIterator(from, to);
+               }
+
+               @Override
+               public IteratorSourceSplit<Long, NumberSequenceIterator> 
getUpdatedSplitForIterator(
+                               final NumberSequenceIterator iterator) {
+                       return new NumberSequenceSplit(splitId, 
iterator.getCurrent(), iterator.getTo());
+               }
+
+               @Override
+               public String toString() {
+                       return String.format("NumberSequenceSplit [%d, %d] 
(%s)", from, to, splitId);
+               }
+       }
+
+       private static final class SplitSerializer implements 
SimpleVersionedSerializer<NumberSequenceSplit> {
+
+               private static final int CURRENT_VERSION = 1;
+
+               @Override
+               public int getVersion() {
+                       return CURRENT_VERSION;
+               }
+
+               @Override
+               public byte[] serialize(NumberSequenceSplit split) throws 
IOException {
+                       checkArgument(split.getClass() == 
NumberSequenceSplit.class, "cannot serialize subclasses");
+
+                       // We will serialize 2 longs (16 bytes) plus the UFT 
representation of the string (2 + length)
+                       final DataOutputSerializer out = new 
DataOutputSerializer(split.splitId().length() + 18);
+                       serializeV1(out, split);
+                       return out.getCopyOfBuffer();
+               }
+
+               @Override
+               public NumberSequenceSplit deserialize(int version, byte[] 
serialized) throws IOException {
+                       if (version != CURRENT_VERSION) {
+                               throw new IOException("Unrecognized version: " 
+ version);
+                       }
+                       final DataInputDeserializer in = new 
DataInputDeserializer(serialized);
+                       return deserializeV1(in);
+               }
+
+               static void serializeV1(DataOutputView out, NumberSequenceSplit 
split) throws IOException {
+                       out.writeUTF(split.splitId());
+                       out.writeLong(split.from());
+                       out.writeLong(split.to());
+               }
+
+               static NumberSequenceSplit deserializeV1(DataInputView in) 
throws IOException {
+                       return new NumberSequenceSplit(in.readUTF(), 
in.readLong(), in.readLong());
+               }
+       }
+
+       private static final class CheckpointSerializer implements 
SimpleVersionedSerializer<Collection<NumberSequenceSplit>> {
+
+               private static final int CURRENT_VERSION = 1;
+
+               @Override
+               public int getVersion() {
+                       return CURRENT_VERSION;
+               }
+
+               @Override
+               public byte[] serialize(Collection<NumberSequenceSplit> 
checkpoint) throws IOException {
+                       // Each split needs 2 longs (16 bytes) plus the UFT 
representation of the string (2 + length)
+                       // Assuming at most 4 digit split IDs, 22 bytes per 
split avoids any intermediate array resizing.
+                       // plus four bytes for the length field
+                       final DataOutputSerializer out = new 
DataOutputSerializer(checkpoint.size() * 22 + 4);
+                       out.writeInt(checkpoint.size());
+                       for (NumberSequenceSplit split : checkpoint) {
+                               SplitSerializer.serializeV1(out, split);
+                       }
+                       return out.getCopyOfBuffer();
+               }
+
+               @Override
+               public Collection<NumberSequenceSplit> deserialize(int version, 
byte[] serialized) throws IOException {
+                       if (version != CURRENT_VERSION) {
+                               throw new IOException("Unrecognized version: " 
+ version);
+                       }
+                       final DataInputDeserializer in = new 
DataInputDeserializer(serialized);
+                       final int num = in.readInt();
+                       final ArrayList<NumberSequenceSplit> result = new 
ArrayList<>(num);
+                       for (int remaining = num; remaining > 0; remaining--) {
+                               result.add(SplitSerializer.deserializeV1(in));
+                       }
+                       return result;
+               }
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
new file mode 100644
index 0000000..3ee2597
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SplitEnumerator} for iterator sources. Simply takes the pre-split 
set of splits and assigns
+ * it first-come-first-serve.
+ *
+ * @param <SplitT> The type of the splits used by the source.
+ */
+public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, 
?>> implements SplitEnumerator<SplitT, Collection<SplitT>> {
+
+       private final SplitEnumeratorContext<SplitT> context;
+       private final Queue<SplitT> remainingSplits;
+
+       public IteratorSourceEnumerator(SplitEnumeratorContext<SplitT> context, 
Collection<SplitT> splits) {
+               this.context = checkNotNull(context);
+               this.remainingSplits = new ArrayDeque<>(splits);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void start() {}
+
+       @Override
+       public void close() {}
+
+       @Override
+       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+               if (!(sourceEvent instanceof SplitRequestEvent)) {
+                       throw new FlinkRuntimeException("Unrecognized event: " 
+ sourceEvent);
+               }
+
+               final SplitT nextSplit = remainingSplits.poll();
+               if (nextSplit != null) {
+                       context.assignSplit(nextSplit, subtaskId);
+               } else {
+                       context.sendEventToSourceReader(subtaskId, new 
NoSplitAvailableEvent());
+               }
+       }
+
+       @Override
+       public void addSplitsBack(List<SplitT> splits, int subtaskId) {
+               remainingSplits.addAll(splits);
+       }
+
+       @Override
+       public Collection<SplitT> snapshotState() throws Exception {
+               return remainingSplits;
+       }
+
+       @Override
+       public void addReader(int subtaskId) {
+               // we don't assign any splits here, because this registration 
happens after fist startup
+               // and after each reader restart/recovery
+               // we only want to assign splits once, initially, which we get 
by reacting to the readers explicit
+               // split request
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
new file mode 100644
index 0000000..f9d7461
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an
+ * {@link IteratorSourceSplit}.
+ *
+ * <p>The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and turning
+ * it back into a split for checkpointing.
+ *
+ * @param <E> The type of events returned by the reader.
+ * @param <IterT> The type of the iterator that produces the events. This type 
exists to make the
+ *                 conversion between iterator and {@code IteratorSourceSplit} 
type safe.
+ * @param <SplitT> The concrete type of the {@code IteratorSourceSplit} that 
creates and converts the
+ *                 iterator that produces this reader's elements.
+ */
+public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+               implements SourceReader<E, SplitT> {
+
+       /** The context for this reader, to communicate with the enumerator. */
+       private final SourceReaderContext context;
+
+       /** The availability future. This reader is available as soon as a 
split is assigned. */
+       private final CompletableFuture<Void> availability;
+
+       /** The iterator producing data. Non-null after a split has been 
assigned.
+        * This field is null or non-null always together with the {@link 
#currentSplit} field. */
+       @Nullable
+       private IterT iterator;
+
+       /** The split whose data we return. Non-null after a split has been 
assigned.
+        * This field is null or non-null always together with the {@link 
#iterator} field. */
+       @Nullable
+       private SplitT currentSplit;
+
+       /** The remaining splits. Null means no splits have yet been assigned. 
*/
+       @Nullable
+       private Queue<SplitT> remainingSplits;
+
+       public IteratorSourceReader(SourceReaderContext context) {
+               this.context = checkNotNull(context);
+               this.availability = new CompletableFuture<>();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void start() {
+               // request a split only if we did not get one during restore
+               if (iterator == null) {
+                       context.sendSourceEventToCoordinator(new 
SplitRequestEvent());
+               }
+       }
+
+       @Override
+       public InputStatus pollNext(ReaderOutput<E> output) {
+               if (iterator != null && iterator.hasNext()) {
+                       output.collect(iterator.next());
+                       return InputStatus.MORE_AVAILABLE;
+               } else if (remainingSplits == null) {
+                       // nothing assigned yet, need to wait and come back 
when splits have been assigned
+                       return InputStatus.NOTHING_AVAILABLE;
+               } else {
+                       currentSplit = remainingSplits.poll();
+                       if (currentSplit != null) {
+                               iterator = currentSplit.getIterator();
+                               return pollNext(output);
+                       } else {
+                               return InputStatus.END_OF_INPUT;
+                       }
+               }
+       }
+
+       @Override
+       public CompletableFuture<Void> isAvailable() {
+               return availability;
+       }
+
+       @Override
+       public void addSplits(List<SplitT> splits) {
+               checkState(remainingSplits == null, "Cannot accept more than 
one split assignment");
+               remainingSplits = new ArrayDeque<>(splits);
+               availability.complete(null); // from now on we are always 
available
+       }
+
+       @Override
+       public List<SplitT> snapshotState() {
+               final ArrayList<SplitT> allSplits = new ArrayList<>(1 + 
remainingSplits.size());
+               if (iterator != null) {
+                       @SuppressWarnings("unchecked")
+                       final SplitT inProgressSplit = (SplitT) 
currentSplit.getUpdatedSplitForIterator(iterator);
+                       allSplits.add(inProgressSplit);
+               }
+               allSplits.addAll(remainingSplits);
+               return allSplits;
+       }
+
+       @Override
+       public void handleSourceEvents(SourceEvent sourceEvent) {
+               if (sourceEvent instanceof NoSplitAvailableEvent) {
+                       // non-null queue signals splits were assigned, in this 
case no splits
+                       remainingSplits = new ArrayDeque<>();
+               } else {
+                       throw new FlinkRuntimeException("Unexpected event: " + 
sourceEvent);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {}
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceSplit.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceSplit.java
new file mode 100644
index 0000000..60caf4f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceSplit.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.Iterator;
+
+/**
+ * A {@link SourceSplit} that represents a sequence of elements captured in an 
iterator.
+ * The split produces the iterator and converts the iterator back to a new 
split during
+ * checkpointing.
+ *
+ * @param <E> The type of the elements returned by the iterator.
+ */
+public interface IteratorSourceSplit<E, IterT extends Iterator<E>> extends 
SourceSplit {
+
+       /**
+        * Gets the iterator over the elements of this split.
+        */
+       IterT getIterator();
+
+       /**
+        * Converts an iterator (that may have returned some elements already) 
back into a source split.
+        */
+       IteratorSourceSplit<E, IterT> getUpdatedSplitForIterator(IterT 
iterator);
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java
new file mode 100644
index 0000000..5a28565
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+/**
+ * A simple {@link SourceEvent} indicating that there is no split available 
for the reader (any more).
+ * This event is typically sent from the {@link SplitEnumerator} to the {@link 
SourceReader}.
+ */
+public final class NoSplitAvailableEvent implements SourceEvent {
+       private static final long serialVersionUID = 1L;
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java
new file mode 100644
index 0000000..3e561b7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+
+/**
+ * A {@link SourceEvent} representing the request for a split, typically sent 
from the
+ * {@link SourceReader} to the {@link SplitEnumerator}.
+ */
+public final class SplitRequestEvent implements SourceEvent {
+       private static final long serialVersionUID = 1L;
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
new file mode 100644
index 0000000..450983e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.mocks.TestingReaderContext;
+import org.apache.flink.api.connector.source.mocks.TestingReaderOutput;
+import org.apache.flink.core.io.InputStatus;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link NumberSequenceSource}.
+ */
+public class NumberSequenceSourceTest {
+
+       @Test
+       public void testReaderCheckpoints() throws Exception {
+               final long from = 177;
+               final long mid = 333;
+               final long to = 563;
+               final long elementsPerCycle = (to - from) / 3;
+
+               final TestingReaderOutput<Long> out = new 
TestingReaderOutput<>();
+
+               SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> 
reader = createReader();
+               reader.addSplits(Arrays.asList(
+                               new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
+                               new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+               long remainingInCycle = elementsPerCycle;
+               while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
+                       if (--remainingInCycle <= 0) {
+                               remainingInCycle = elementsPerCycle;
+                               // checkpoint
+                               List<NumberSequenceSource.NumberSequenceSplit> 
splits = reader.snapshotState();
+
+                               // re-create and restore
+                               reader = createReader();
+                               reader.addSplits(splits);
+                       }
+               }
+
+               final List<Long> result = out.getEmittedRecords();
+               validateSequence(result, from, to);
+       }
+
+       private static void validateSequence(final List<Long> sequence, final 
long from, final long to) {
+               if (sequence.size() != to - from + 1) {
+                       failSequence(sequence, from, to);
+               }
+
+               long nextExpected = from;
+               for (Long next : sequence) {
+                       if (next != nextExpected++) {
+                               failSequence(sequence, from, to);
+                       }
+               }
+       }
+
+       private static void failSequence(final List<Long> sequence, final long 
from, final long to) {
+               fail(String.format("Expected: A sequence [%d, %d], but found: 
sequence (size %d) : %s",
+                               from, to, sequence.size(), sequence));
+       }
+
+       private static SourceReader<Long, 
NumberSequenceSource.NumberSequenceSplit> createReader() {
+               // the arguments passed in the source constructor matter only 
to the enumerator
+               return new NumberSequenceSource(0L, 0L).createReader(new 
TestingReaderContext());
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/IteratorSourcesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/IteratorSourcesITCase.java
new file mode 100644
index 0000000..00909ce
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/IteratorSourcesITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+/**
+ * An integration test for the sources based on iterators.
+ *
+ * <p>This test uses the {@link NumberSequenceSource} as a concrete iterator 
source implementation,
+ * but covers all runtime-related aspects for all the iterator-based sources 
together.
+ */
+public class IteratorSourcesITCase extends TestLogger {
+
+       private static final int PARALLELISM = 4;
+
+       @ClassRule
+       public static final MiniClusterWithClientResource MINI_CLUSTER = new 
MiniClusterWithClientResource(
+               new MiniClusterResourceConfiguration.Builder()
+                       .setNumberTaskManagers(1)
+                       .setNumberSlotsPerTaskManager(PARALLELISM)
+                       .build());
+
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testParallelSourceExecution() throws Exception {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(PARALLELISM);
+
+               final DataStream<Long> stream = env.fromSource(
+                               new NumberSequenceSource(1L, 1_000L),
+                               WatermarkStrategy.noWatermarks(),
+                               "iterator source");
+
+               final List<Long> result = 
DataStreamUtils.collectBoundedStream(stream, "Iterator Source Test");
+
+               verifySequence(result, 1L, 1_000L);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test utils
+       // 
------------------------------------------------------------------------
+
+       private static void verifySequence(final List<Long> sequence, final 
long from, final long to) {
+               if (sequence.size() != to - from + 1) {
+                       fail(String.format("Expected: Sequence [%d, %d]. Found: 
%s", from, to, sequence));
+               }
+
+               final ArrayList<Long> list = new ArrayList<>(sequence); // copy 
to be safe against immutable lists, etc.
+               list.sort(Long::compareTo);
+
+               int pos = 0;
+               for (long value = from; value <= to; value++, pos++) {
+                       if (value != list.get(pos)) {
+                               fail(String.format("Expected: Sequence [%d, 
%d]. Found: %s", from, to, list));
+                       }
+               }
+       }
+}

Reply via email to