This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0f7ff28c4109654e49cd11ab380bf5756e2bc497 Author: Stephan Ewen <se...@apache.org> AuthorDate: Wed Sep 23 18:10:34 2020 +0200 [FLINK-19457][core] Add a number sequence generating source for the FLIP-27 source API. This closes #13512 --- .../connector/source/lib/NumberSequenceSource.java | 253 +++++++++++++++++++++ .../source/lib/util/IteratorSourceEnumerator.java | 88 +++++++ .../source/lib/util/IteratorSourceReader.java | 146 ++++++++++++ .../source/lib/util/IteratorSourceSplit.java | 43 ++++ .../source/lib/util/NoSplitAvailableEvent.java | 31 +++ .../source/lib/util/SplitRequestEvent.java | 31 +++ .../source/lib/NumberSequenceSourceTest.java | 91 ++++++++ .../source/lib/IteratorSourcesITCase.java | 90 ++++++++ 8 files changed, 773 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java new file mode 100644 index 0000000..154ac7f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.NumberSequenceIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A data source that produces a sequence of numbers (longs). + * This source is useful for testing and for cases that just need a stream of N events of any kind. + * + * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel + * source readers. Each sub-sequence will be produced in order. + * Consequently, if the parallelism is limited to one, this will produce one sequence in order. + * + * <p>This source is always bounded. For very long sequences (for example over the entire domain + * of long integer values), user may want to consider executing the application in a streaming manner, + * because, despite the fact that the produced stream is bounded, the end bound is pretty far away. + */ +public class NumberSequenceSource implements + Source<Long, NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>>, + ResultTypeQueryable<Long> { + + private static final long serialVersionUID = 1L; + + /** The starting number in the sequence, inclusive. */ + private final long from; + + /** The end number in the sequence, inclusive. */ + private final long to; + + /** + * Creates a new {@code NumberSequenceSource} that produces parallel sequences covering the range + * {@code from} to {@code to} (both boundaries are inclusive). + */ + public NumberSequenceSource(long from, long to) { + checkArgument(from <= to, "'from' must be <= 'to'"); + this.from = from; + this.to = to; + } + + @Override + public TypeInformation<Long> getProducedType() { + return Types.LONG; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader<Long, NumberSequenceSplit> createReader(SourceReaderContext readerContext) { + return new IteratorSourceReader<>(readerContext); + } + + @Override + public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> createEnumerator( + final SplitEnumeratorContext<NumberSequenceSplit> enumContext) { + + final NumberSequenceIterator[] subSequences = + new NumberSequenceIterator(from, to).split(enumContext.currentParallelism()); + final ArrayList<NumberSequenceSplit> splits = new ArrayList<>(subSequences.length); + + int splitId = 1; + for (NumberSequenceIterator seq : subSequences) { + splits.add(new NumberSequenceSplit(String.valueOf(splitId++), seq.getCurrent(), seq.getTo())); + } + + return new IteratorSourceEnumerator<>(enumContext, splits); + } + + @Override + public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> restoreEnumerator( + final SplitEnumeratorContext<NumberSequenceSplit> enumContext, Collection<NumberSequenceSplit> checkpoint) { + return new IteratorSourceEnumerator<>(enumContext, checkpoint); + } + + @Override + public SimpleVersionedSerializer<NumberSequenceSplit> getSplitSerializer() { + return new SplitSerializer(); + } + + @Override + public SimpleVersionedSerializer<Collection<NumberSequenceSplit>> getEnumeratorCheckpointSerializer() { + return new CheckpointSerializer(); + } + + // ------------------------------------------------------------------------ + // splits & checkpoint + // ------------------------------------------------------------------------ + + /** + * A split of the source, representing a number sub-sequence. + */ + public static class NumberSequenceSplit implements IteratorSourceSplit<Long, NumberSequenceIterator> { + + private final String splitId; + private final long from; + private final long to; + + public NumberSequenceSplit(String splitId, long from, long to) { + checkArgument(from <= to, "'from' must be <= 'to'"); + this.splitId = checkNotNull(splitId); + this.from = from; + this.to = to; + } + + @Override + public String splitId() { + return splitId; + } + + public long from() { + return from; + } + + public long to() { + return to; + } + + @Override + public NumberSequenceIterator getIterator() { + return new NumberSequenceIterator(from, to); + } + + @Override + public IteratorSourceSplit<Long, NumberSequenceIterator> getUpdatedSplitForIterator( + final NumberSequenceIterator iterator) { + return new NumberSequenceSplit(splitId, iterator.getCurrent(), iterator.getTo()); + } + + @Override + public String toString() { + return String.format("NumberSequenceSplit [%d, %d] (%s)", from, to, splitId); + } + } + + private static final class SplitSerializer implements SimpleVersionedSerializer<NumberSequenceSplit> { + + private static final int CURRENT_VERSION = 1; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(NumberSequenceSplit split) throws IOException { + checkArgument(split.getClass() == NumberSequenceSplit.class, "cannot serialize subclasses"); + + // We will serialize 2 longs (16 bytes) plus the UFT representation of the string (2 + length) + final DataOutputSerializer out = new DataOutputSerializer(split.splitId().length() + 18); + serializeV1(out, split); + return out.getCopyOfBuffer(); + } + + @Override + public NumberSequenceSplit deserialize(int version, byte[] serialized) throws IOException { + if (version != CURRENT_VERSION) { + throw new IOException("Unrecognized version: " + version); + } + final DataInputDeserializer in = new DataInputDeserializer(serialized); + return deserializeV1(in); + } + + static void serializeV1(DataOutputView out, NumberSequenceSplit split) throws IOException { + out.writeUTF(split.splitId()); + out.writeLong(split.from()); + out.writeLong(split.to()); + } + + static NumberSequenceSplit deserializeV1(DataInputView in) throws IOException { + return new NumberSequenceSplit(in.readUTF(), in.readLong(), in.readLong()); + } + } + + private static final class CheckpointSerializer implements SimpleVersionedSerializer<Collection<NumberSequenceSplit>> { + + private static final int CURRENT_VERSION = 1; + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(Collection<NumberSequenceSplit> checkpoint) throws IOException { + // Each split needs 2 longs (16 bytes) plus the UFT representation of the string (2 + length) + // Assuming at most 4 digit split IDs, 22 bytes per split avoids any intermediate array resizing. + // plus four bytes for the length field + final DataOutputSerializer out = new DataOutputSerializer(checkpoint.size() * 22 + 4); + out.writeInt(checkpoint.size()); + for (NumberSequenceSplit split : checkpoint) { + SplitSerializer.serializeV1(out, split); + } + return out.getCopyOfBuffer(); + } + + @Override + public Collection<NumberSequenceSplit> deserialize(int version, byte[] serialized) throws IOException { + if (version != CURRENT_VERSION) { + throw new IOException("Unrecognized version: " + version); + } + final DataInputDeserializer in = new DataInputDeserializer(serialized); + final int num = in.readInt(); + final ArrayList<NumberSequenceSplit> result = new ArrayList<>(num); + for (int remaining = num; remaining > 0; remaining--) { + result.add(SplitSerializer.deserializeV1(in)); + } + return result; + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java new file mode 100644 index 0000000..3ee2597 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link SplitEnumerator} for iterator sources. Simply takes the pre-split set of splits and assigns + * it first-come-first-serve. + * + * @param <SplitT> The type of the splits used by the source. + */ +public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, ?>> implements SplitEnumerator<SplitT, Collection<SplitT>> { + + private final SplitEnumeratorContext<SplitT> context; + private final Queue<SplitT> remainingSplits; + + public IteratorSourceEnumerator(SplitEnumeratorContext<SplitT> context, Collection<SplitT> splits) { + this.context = checkNotNull(context); + this.remainingSplits = new ArrayDeque<>(splits); + } + + // ------------------------------------------------------------------------ + + @Override + public void start() {} + + @Override + public void close() {} + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (!(sourceEvent instanceof SplitRequestEvent)) { + throw new FlinkRuntimeException("Unrecognized event: " + sourceEvent); + } + + final SplitT nextSplit = remainingSplits.poll(); + if (nextSplit != null) { + context.assignSplit(nextSplit, subtaskId); + } else { + context.sendEventToSourceReader(subtaskId, new NoSplitAvailableEvent()); + } + } + + @Override + public void addSplitsBack(List<SplitT> splits, int subtaskId) { + remainingSplits.addAll(splits); + } + + @Override + public Collection<SplitT> snapshotState() throws Exception { + return remainingSplits; + } + + @Override + public void addReader(int subtaskId) { + // we don't assign any splits here, because this registration happens after fist startup + // and after each reader restart/recovery + // we only want to assign splits once, initially, which we get by reacting to the readers explicit + // split request + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java new file mode 100644 index 0000000..f9d7461 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link SourceReader} that returns the values of an iterator, supplied via an + * {@link IteratorSourceSplit}. + * + * <p>The {@code IteratorSourceSplit} is also responsible for taking the current iterator and turning + * it back into a split for checkpointing. + * + * @param <E> The type of events returned by the reader. + * @param <IterT> The type of the iterator that produces the events. This type exists to make the + * conversion between iterator and {@code IteratorSourceSplit} type safe. + * @param <SplitT> The concrete type of the {@code IteratorSourceSplit} that creates and converts the + * iterator that produces this reader's elements. + */ +public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>> + implements SourceReader<E, SplitT> { + + /** The context for this reader, to communicate with the enumerator. */ + private final SourceReaderContext context; + + /** The availability future. This reader is available as soon as a split is assigned. */ + private final CompletableFuture<Void> availability; + + /** The iterator producing data. Non-null after a split has been assigned. + * This field is null or non-null always together with the {@link #currentSplit} field. */ + @Nullable + private IterT iterator; + + /** The split whose data we return. Non-null after a split has been assigned. + * This field is null or non-null always together with the {@link #iterator} field. */ + @Nullable + private SplitT currentSplit; + + /** The remaining splits. Null means no splits have yet been assigned. */ + @Nullable + private Queue<SplitT> remainingSplits; + + public IteratorSourceReader(SourceReaderContext context) { + this.context = checkNotNull(context); + this.availability = new CompletableFuture<>(); + } + + // ------------------------------------------------------------------------ + + @Override + public void start() { + // request a split only if we did not get one during restore + if (iterator == null) { + context.sendSourceEventToCoordinator(new SplitRequestEvent()); + } + } + + @Override + public InputStatus pollNext(ReaderOutput<E> output) { + if (iterator != null && iterator.hasNext()) { + output.collect(iterator.next()); + return InputStatus.MORE_AVAILABLE; + } else if (remainingSplits == null) { + // nothing assigned yet, need to wait and come back when splits have been assigned + return InputStatus.NOTHING_AVAILABLE; + } else { + currentSplit = remainingSplits.poll(); + if (currentSplit != null) { + iterator = currentSplit.getIterator(); + return pollNext(output); + } else { + return InputStatus.END_OF_INPUT; + } + } + } + + @Override + public CompletableFuture<Void> isAvailable() { + return availability; + } + + @Override + public void addSplits(List<SplitT> splits) { + checkState(remainingSplits == null, "Cannot accept more than one split assignment"); + remainingSplits = new ArrayDeque<>(splits); + availability.complete(null); // from now on we are always available + } + + @Override + public List<SplitT> snapshotState() { + final ArrayList<SplitT> allSplits = new ArrayList<>(1 + remainingSplits.size()); + if (iterator != null) { + @SuppressWarnings("unchecked") + final SplitT inProgressSplit = (SplitT) currentSplit.getUpdatedSplitForIterator(iterator); + allSplits.add(inProgressSplit); + } + allSplits.addAll(remainingSplits); + return allSplits; + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof NoSplitAvailableEvent) { + // non-null queue signals splits were assigned, in this case no splits + remainingSplits = new ArrayDeque<>(); + } else { + throw new FlinkRuntimeException("Unexpected event: " + sourceEvent); + } + } + + @Override + public void close() throws Exception {} +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceSplit.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceSplit.java new file mode 100644 index 0000000..60caf4f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceSplit.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.api.connector.source.SourceSplit; + +import java.util.Iterator; + +/** + * A {@link SourceSplit} that represents a sequence of elements captured in an iterator. + * The split produces the iterator and converts the iterator back to a new split during + * checkpointing. + * + * @param <E> The type of the elements returned by the iterator. + */ +public interface IteratorSourceSplit<E, IterT extends Iterator<E>> extends SourceSplit { + + /** + * Gets the iterator over the elements of this split. + */ + IterT getIterator(); + + /** + * Converts an iterator (that may have returned some elements already) back into a source split. + */ + IteratorSourceSplit<E, IterT> getUpdatedSplitForIterator(IterT iterator); +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java new file mode 100644 index 0000000..5a28565 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SplitEnumerator; + +/** + * A simple {@link SourceEvent} indicating that there is no split available for the reader (any more). + * This event is typically sent from the {@link SplitEnumerator} to the {@link SourceReader}. + */ +public final class NoSplitAvailableEvent implements SourceEvent { + private static final long serialVersionUID = 1L; +} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java new file mode 100644 index 0000000..3e561b7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SplitEnumerator; + +/** + * A {@link SourceEvent} representing the request for a split, typically sent from the + * {@link SourceReader} to the {@link SplitEnumerator}. + */ +public final class SplitRequestEvent implements SourceEvent { + private static final long serialVersionUID = 1L; +} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java new file mode 100644 index 0000000..450983e --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.mocks.TestingReaderContext; +import org.apache.flink.api.connector.source.mocks.TestingReaderOutput; +import org.apache.flink.core.io.InputStatus; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.fail; + +/** + * Tests for the {@link NumberSequenceSource}. + */ +public class NumberSequenceSourceTest { + + @Test + public void testReaderCheckpoints() throws Exception { + final long from = 177; + final long mid = 333; + final long to = 563; + final long elementsPerCycle = (to - from) / 3; + + final TestingReaderOutput<Long> out = new TestingReaderOutput<>(); + + SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> reader = createReader(); + reader.addSplits(Arrays.asList( + new NumberSequenceSource.NumberSequenceSplit("split-1", from, mid), + new NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to))); + + long remainingInCycle = elementsPerCycle; + while (reader.pollNext(out) != InputStatus.END_OF_INPUT) { + if (--remainingInCycle <= 0) { + remainingInCycle = elementsPerCycle; + // checkpoint + List<NumberSequenceSource.NumberSequenceSplit> splits = reader.snapshotState(); + + // re-create and restore + reader = createReader(); + reader.addSplits(splits); + } + } + + final List<Long> result = out.getEmittedRecords(); + validateSequence(result, from, to); + } + + private static void validateSequence(final List<Long> sequence, final long from, final long to) { + if (sequence.size() != to - from + 1) { + failSequence(sequence, from, to); + } + + long nextExpected = from; + for (Long next : sequence) { + if (next != nextExpected++) { + failSequence(sequence, from, to); + } + } + } + + private static void failSequence(final List<Long> sequence, final long from, final long to) { + fail(String.format("Expected: A sequence [%d, %d], but found: sequence (size %d) : %s", + from, to, sequence.size(), sequence)); + } + + private static SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader() { + // the arguments passed in the source constructor matter only to the enumerator + return new NumberSequenceSource(0L, 0L).createReader(new TestingReaderContext()); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/IteratorSourcesITCase.java b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/IteratorSourcesITCase.java new file mode 100644 index 0000000..00909ce --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/IteratorSourcesITCase.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.fail; + +/** + * An integration test for the sources based on iterators. + * + * <p>This test uses the {@link NumberSequenceSource} as a concrete iterator source implementation, + * but covers all runtime-related aspects for all the iterator-based sources together. + */ +public class IteratorSourcesITCase extends TestLogger { + + private static final int PARALLELISM = 4; + + @ClassRule + public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(PARALLELISM) + .build()); + + // ------------------------------------------------------------------------ + + @Test + public void testParallelSourceExecution() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + + final DataStream<Long> stream = env.fromSource( + new NumberSequenceSource(1L, 1_000L), + WatermarkStrategy.noWatermarks(), + "iterator source"); + + final List<Long> result = DataStreamUtils.collectBoundedStream(stream, "Iterator Source Test"); + + verifySequence(result, 1L, 1_000L); + } + + // ------------------------------------------------------------------------ + // test utils + // ------------------------------------------------------------------------ + + private static void verifySequence(final List<Long> sequence, final long from, final long to) { + if (sequence.size() != to - from + 1) { + fail(String.format("Expected: Sequence [%d, %d]. Found: %s", from, to, sequence)); + } + + final ArrayList<Long> list = new ArrayList<>(sequence); // copy to be safe against immutable lists, etc. + list.sort(Long::compareTo); + + int pos = 0; + for (long value = from; value <= to; value++, pos++) { + if (value != list.get(pos)) { + fail(String.format("Expected: Sequence [%d, %d]. Found: %s", from, to, list)); + } + } + } +}