zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980975212


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletionStage;
+
+/** The interface to rate limit execution of methods. */
+public interface RateLimiter extends Serializable {
+
+    /** Returns a future that is completed once another event would not exceed 
the rate limit. */

Review Comment:
   We should clarify that acquire must not be called again until the returned 
future completes, as otherwise we can go budget.



##########
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {

Review Comment:
   ```suggestion
   class DataGeneratorSourceTest {
   ```
   Remove public modifier from classes and methods.



##########
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+    @Test
+    @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+    public void testRestoreEnumerator() throws Exception {
+        final GeneratorFunction<Long, Long> generatorFunctionStateless = index 
-> index;
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+        final int parallelism = 2;
+        final 
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+
+        SplitEnumerator<
+                        NumberSequenceSource.NumberSequenceSplit,
+                        Collection<NumberSequenceSource.NumberSequenceSplit>>
+                enumerator = dataGeneratorSource.createEnumerator(context);
+
+        // start() is not strictly necessary in the current implementation, 
but should logically be
+        // executed in this order (protect against any breaking changes in the 
start() method).
+        enumerator.start();
+
+        Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState =
+                enumerator.snapshotState(0);
+        assertThat(enumeratorState).hasSize(parallelism);
+
+        enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+        // Verify that splits were restored and can be assigned
+        assertThat(context.getSplitsAssignmentSequence()).isEmpty();
+        for (NumberSequenceSource.NumberSequenceSplit ignored : 
enumeratorState) {
+            enumerator.handleSplitRequest(0, "hostname");
+        }
+        
assertThat(context.getSplitsAssignmentSequence()).hasSize(enumeratorState.size());
+    }
+
+    @Test
+    @DisplayName("Uses the underlying NumberSequenceSource correctly for 
checkpointing.")
+    public void testReaderCheckpoints() throws Exception {
+        final long from = 177;
+        final long mid = 333;
+        final long to = 563;
+        final long elementsPerCycle = (to - from) / 3;
+
+        final TestingReaderOutput<Long> out = new TestingReaderOutput<>();
+
+        SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> reader = 
createReader();
+        reader.addSplits(
+                Arrays.asList(
+                        new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
+                        new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+        long remainingInCycle = elementsPerCycle;
+        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
+            if (--remainingInCycle <= 0) {
+                remainingInCycle = elementsPerCycle;
+                // checkpoint
+                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+
+                // re-create and restore
+                reader = createReader();
+                if (splits.isEmpty()) {
+                    reader.notifyNoMoreSplits();
+                } else {
+                    reader.addSplits(splits);
+                }
+            }
+        }
+
+        final List<Long> result = out.getEmittedRecords();
+        validateSequence(result, from, to);
+    }
+
+    private static void validateSequence(
+            final List<Long> sequence, final long from, final long to) {
+        if (sequence.size() != to - from + 1) {
+            failSequence(sequence, from, to);
+        }
+
+        long nextExpected = from;
+        for (Long next : sequence) {
+            if (next != nextExpected++) {
+                failSequence(sequence, from, to);
+            }
+        }

Review Comment:
   You should be able to replace this with something like
   
   `assertThat(sequence).containsExactly(LongStream.range(to, from));`



##########
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.lib.util.GatedRateLimiter;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratingIteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.lib.util.RateLimiter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static java.util.stream.Collectors.summingInt;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+public class DataGeneratorSourceITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @RegisterExtension
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    @DisplayName("Combined results of parallel source readers produce the 
expected sequence.")
+    public void testParallelSourceExecution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        final DataStream<Long> stream = getGeneratorSourceStream(index -> 
index, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Generator function can be instantiated as an anonymous 
class.")
+    public void testParallelSourceExecutionWithAnonymousClass() throws 
Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                new GeneratorFunction<Long, Long>() {
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function are not 'swallowed'.")

Review Comment:
   Man am I not a fan of this; I don't see immediately which test failed but 
first have to match it against against the display name.
   
   Why not just name it `testExceptionFromGeneratorFunctionNotSwallowed`?



##########
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.lib.util.GatedRateLimiter;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratingIteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.lib.util.RateLimiter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static java.util.stream.Collectors.summingInt;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+public class DataGeneratorSourceITCase extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+
+    @RegisterExtension
+    private static final MiniClusterExtension miniClusterExtension =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    @DisplayName("Combined results of parallel source readers produce the 
expected sequence.")
+    public void testParallelSourceExecution() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        final DataStream<Long> stream = getGeneratorSourceStream(index -> 
index, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Generator function can be instantiated as an anonymous 
class.")
+    public void testParallelSourceExecutionWithAnonymousClass() throws 
Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                new GeneratorFunction<Long, Long>() {
+
+                    @Override
+                    public Long map(Long value) {
+                        return value;
+                    }
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        final List<Long> result = stream.executeAndCollect(10000);
+
+        assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function are not 'swallowed'.")
+    public void testFailingGeneratorFunction() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+
+        GeneratorFunction<Long, Long> generatorFunction =
+                value -> {
+                    throw new Exception("boom");
+                };
+
+        final DataStream<Long> stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+        assertThatThrownBy(
+                        () -> {
+                            stream.executeAndCollect(10000);
+                        })
+                .satisfies(anyCauseMatches("exception on this input:"))
+                .satisfies(anyCauseMatches("boom"));
+    }
+
+    @Test
+    @DisplayName("Exceptions from the generator function initialization are 
not 'swallowed'.")
+    public void testFailingGeneratorFunctionInitialization() throws Exception {

Review Comment:
   This one frequently fails locally.
   
   ```
   Expecting throwable message:
     "Task is not running, but in state FAILED"
   to contain:
     "Failed to open"
   but did not.
   
   Throwable that failed the check:
   
   org.apache.flink.runtime.operators.coordination.TaskNotRunningException: 
Task is not running, but in state FAILED
        at 
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1482)
   ```



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletionStage;
+
+/** The interface to rate limit execution of methods. */
+public interface RateLimiter extends Serializable {

Review Comment:
   annotate with `@NotThreadSafe`



##########
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+    @Test
+    @DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+    public void testRestoreEnumerator() throws Exception {
+        final GeneratorFunction<Long, Long> generatorFunctionStateless = index 
-> index;
+        final DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+        final int parallelism = 2;
+        final 
MockSplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> context =
+                new MockSplitEnumeratorContext<>(parallelism);
+
+        SplitEnumerator<
+                        NumberSequenceSource.NumberSequenceSplit,
+                        Collection<NumberSequenceSource.NumberSequenceSplit>>
+                enumerator = dataGeneratorSource.createEnumerator(context);
+
+        // start() is not strictly necessary in the current implementation, 
but should logically be
+        // executed in this order (protect against any breaking changes in the 
start() method).
+        enumerator.start();
+
+        Collection<NumberSequenceSource.NumberSequenceSplit> enumeratorState =
+                enumerator.snapshotState(0);
+
+        @SuppressWarnings("unchecked")
+        final Queue<NumberSequenceSource.NumberSequenceSplit> splits =
+                (Queue<NumberSequenceSource.NumberSequenceSplit>)
+                        Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+        assertThat(splits).hasSize(parallelism);
+
+        enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+        @SuppressWarnings("unchecked")
+        final Queue<NumberSequenceSource.NumberSequenceSplit> restoredSplits =
+                (Queue<NumberSequenceSource.NumberSequenceSplit>)
+                        Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+        assertThat(restoredSplits).hasSize(enumeratorState.size());
+    }
+
+    @Test
+    @DisplayName("Uses the underlying NumberSequenceSource correctly for 
checkpointing.")
+    public void testReaderCheckpoints() throws Exception {
+        final long from = 177;
+        final long mid = 333;
+        final long to = 563;
+        final long elementsPerCycle = (to - from) / 3;
+
+        final TestingReaderOutput<Long> out = new TestingReaderOutput<>();
+
+        SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> reader = 
createReader();
+        reader.addSplits(
+                Arrays.asList(
+                        new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
+                        new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+        long remainingInCycle = elementsPerCycle;
+        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
+            if (--remainingInCycle <= 0) {
+                remainingInCycle = elementsPerCycle;
+                // checkpoint
+                List<NumberSequenceSource.NumberSequenceSplit> splits = 
reader.snapshotState(1L);
+
+                // re-create and restore
+                reader = createReader();
+                if (splits.isEmpty()) {
+                    reader.notifyNoMoreSplits();
+                } else {
+                    reader.addSplits(splits);
+                }
+            }
+        }
+
+        final List<Long> result = out.getEmittedRecords();
+        validateSequence(result, from, to);
+    }
+
+    private static void validateSequence(
+            final List<Long> sequence, final long from, final long to) {
+        if (sequence.size() != to - from + 1) {
+            failSequence(sequence, from, to);
+        }
+
+        long nextExpected = from;
+        for (Long next : sequence) {
+            if (next != nextExpected++) {
+                failSequence(sequence, from, to);
+            }
+        }
+    }
+
+    private static void failSequence(final List<Long> sequence, final long 
from, final long to) {
+        Assertions.fail(
+                String.format(
+                        "Expected: A sequence [%d, %d], but found: sequence 
(size %d) : %s",
+                        from, to, sequence.size(), sequence));
+    }
+
+    private static SourceReader<Long, 
NumberSequenceSource.NumberSequenceSplit> createReader()
+            throws Exception {
+        // the arguments passed in the source constructor matter only to the 
enumerator
+        GeneratorFunction<Long, Long> generatorFunctionStateless = index -> 
index;
+        DataGeneratorSource<Long> dataGeneratorSource =
+                new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, Types.LONG);
+
+        return dataGeneratorSource.createReader(new DummyReaderContext());
+    }
+
+    // ------------------------------------------------------------------------
+    //  test utils / mocks
+    //
+    //  the "flink-connector-test-utils module has proper mocks and utils,
+    //  but cannot be used here, because it would create a cyclic dependency.
+    // ------------------------------------------------------------------------
+
+    private static final class DummyReaderContext implements 
SourceReaderContext {
+
+        @Override
+        public SourceReaderMetricGroup metricGroup() {
+            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
+        }
+
+        @Override
+        public Configuration getConfiguration() {
+            return new Configuration();
+        }
+
+        @Override
+        public String getLocalHostName() {
+            return "localhost";
+        }
+
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
+        @Override
+        public void sendSplitRequest() {}
+
+        @Override
+        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+
+        @Override
+        public UserCodeClassLoader getUserCodeClassLoader() {
+            return 
SimpleUserCodeClassLoader.create(getClass().getClassLoader());
+        }
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
+    }
+
+    private static final class TestingReaderOutput<E> implements 
ReaderOutput<E> {

Review Comment:
   I guess we still wouldn't be able to use the connector-test-utils though.
   
   Maybw we should have a connector-test-utils without a flink-streaming-java 
dependency...



##########
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** An integration test for rate limiting built into the DataGeneratorSource. 
*/
+public class RateLimitedSourceReaderITCase extends TestLogger {

Review Comment:
   We should also have some unit tests for the rate limiter imo. At the very 
least something like below (which FYI; fails with the current version in the 
PR).
   
   ```
   class GatedRateLimiterTest {
       @Test
       void testCapacityNotExceededOnCheckpoint() {
           int capacityPerCycle = 5;
   
           final GatedRateLimiter gatedRateLimiter = new 
GatedRateLimiter(capacityPerCycle);
           for (int x = 0; x < capacityPerCycle; x++) {
               assertThat(gatedRateLimiter.acquire()).isCompleted();
           }
   
           CompletionStage<Void> postInitialBatch = gatedRateLimiter.acquire();
           assertThat(postInitialBatch).isNotCompleted();
   
           gatedRateLimiter.notifyCheckpointComplete(0);
   
           assertThat(postInitialBatch).isCompleted();
           for (int x = 0; x < capacityPerCycle - 1; x++) {
               assertThat(gatedRateLimiter.acquire()).isCompleted();
           }
   
           CompletionStage<Void> postCheckpoint = gatedRateLimiter.acquire();
           assertThat(postCheckpoint).isNotCompleted();
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to