zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r991346364


##########
flink-connectors/flink-connector-datagen/pom.xml:
##########
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.17-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>flink-connector-datagen</artifactId>
+       <name>Flink : Connectors : Datagen</name>
+
+       <packaging>jar</packaging>
+
+       <!-- Allow users to pass custom connector versions -->

Review Comment:
   ```suggestion
   ```



##########
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * <p>The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * <p>Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource<String> stream =
+ *         env.fromSource(source,
+ *         WatermarkStrategy.noWatermarks(),
+ *         "Generator Source");
+ * }</pre>
+ *
+ * <p>The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * <p>Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * <p>This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, Long> generatorFunction = index -> index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(
+ *              generatorFunctionStateless,
+ *              Long.MAX_VALUE,
+ *              RateLimiterStrategy.perSecond(100),
+ *              Types.STRING);
+ * }</pre>
+ *
+ * <p>This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, the 
end bound is pretty
+ * far away.
+ */
+@Experimental
+public class DataGeneratorSource<OUT>
+        implements Source<OUT, NumberSequenceSplit, 
Collection<NumberSequenceSplit>>,
+                ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SourceReaderFactory<OUT, NumberSequenceSplit> 
sourceReaderFactory;
+    private final TypeInformation<OUT> typeInfo;
+
+    private final NumberSequenceSource numberSource;
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            TypeInformation<OUT> typeInfo) {
+        this(generatorFunction, count, RateLimiterStrategy.noOp(), typeInfo);
+    }
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param rateLimiterStrategy The strategy for rate limiting.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            RateLimiterStrategy rateLimiterStrategy,
+            TypeInformation<OUT> typeInfo) {
+        this(
+                new GeneratorSourceReaderFactory<>(generatorFunction, 
rateLimiterStrategy),
+                count,
+                typeInfo);
+        ClosureCleaner.clean(
+                generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);

Review Comment:
   Either the closure cleaner call in the private constructor below is 
sufficient to cover the generator function (AND rateLimiterStrategy) or we also 
need to clean the rate limiter strategy.



##########
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for instantiating source readers that produce elements by 
applying a user-supplied
+ * {@link GeneratorFunction}.
+ *
+ * @param <OUT> The type of the output elements.
+ */
+public class GeneratorSourceReaderFactory<OUT>
+        implements SourceReaderFactory<OUT, 
NumberSequenceSource.NumberSequenceSplit> {
+
+    private final GeneratorFunction<Long, OUT> generatorFunction;
+    private final RateLimiterStrategy rateLimiterStrategy;
+
+    /**
+     * Instantiates a new {@code GeneratorSourceReaderFactory}.
+     *
+     * @param generatorFunction The generator function.
+     * @param rateLimiterStrategy The rate limiter strategy.
+     */
+    public GeneratorSourceReaderFactory(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            RateLimiterStrategy rateLimiterStrategy) {
+        this.generatorFunction = checkNotNull(generatorFunction);
+        this.rateLimiterStrategy = rateLimiterStrategy;

Review Comment:
   null check



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGenerator.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.datagen;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** A simple example of generating data with {@link DataGeneratorSource}. */

Review Comment:
   ```suggestion
   /** An example for generating data with a {@link DataGeneratorSource}. */
   ```



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletionStage;
+
+/** The interface to rate limit execution of methods. */
+@NotThreadSafe
+public interface RateLimiter extends Serializable {

Review Comment:
   I think these no longer have to be serializable since the strategy is IIRC 
only called when the task was already sent to the TM.



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java:
##########
@@ -52,128 +40,19 @@
 @Public
 public class IteratorSourceReader<
                 E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
-        implements SourceReader<E, SplitT> {
-
-    /** The context for this reader, to communicate with the enumerator. */
-    private final SourceReaderContext context;
-
-    /** The availability future. This reader is available as soon as a split 
is assigned. */
-    private CompletableFuture<Void> availability;
-
-    /**
-     * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
-     * non-null always together with the {@link #currentSplit} field.
-     */
-    @Nullable private IterT iterator;
-
-    /**
-     * The split whose data we return. Non-null after a split has been 
assigned. This field is null
-     * or non-null always together with the {@link #iterator} field.
-     */
-    @Nullable private SplitT currentSplit;
-
-    /** The remaining splits that were assigned but not yet processed. */
-    private final Queue<SplitT> remainingSplits;
-
-    private boolean noMoreSplits;
+        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
 
     public IteratorSourceReader(SourceReaderContext context) {
-        this.context = checkNotNull(context);
-        this.availability = new CompletableFuture<>();
-        this.remainingSplits = new ArrayDeque<>();
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public void start() {
-        // request a split if we don't have one
-        if (remainingSplits.isEmpty()) {
-            context.sendSplitRequest();
-        }
-    }
-
-    @Override
-    public InputStatus pollNext(ReaderOutput<E> output) {
-        if (iterator != null) {
-            if (iterator.hasNext()) {
-                output.collect(iterator.next());
-                return InputStatus.MORE_AVAILABLE;
-            } else {
-                finishSplit();
-            }
-        }
-
-        return tryMoveToNextSplit();
-    }
-
-    private void finishSplit() {
-        iterator = null;
-        currentSplit = null;
-
-        // request another split if no other is left
-        // we do this only here in the finishSplit part to avoid requesting a 
split
-        // whenever the reader is polled and doesn't currently have a split
-        if (remainingSplits.isEmpty() && !noMoreSplits) {
-            context.sendSplitRequest();
-        }
-    }
-
-    private InputStatus tryMoveToNextSplit() {
-        currentSplit = remainingSplits.poll();
-        if (currentSplit != null) {
-            iterator = currentSplit.getIterator();
-            return InputStatus.MORE_AVAILABLE;
-        } else if (noMoreSplits) {
-            return InputStatus.END_OF_INPUT;
-        } else {
-            // ensure we are not called in a loop by resetting the 
availability future
-            if (availability.isDone()) {
-                availability = new CompletableFuture<>();
-            }
-
-            return InputStatus.NOTHING_AVAILABLE;
-        }
+        super(context);
     }
 
     @Override
-    public CompletableFuture<Void> isAvailable() {
-        return availability;
-    }
+    protected void start(SourceReaderContext context) {}

Review Comment:
   Should this be really in here?



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@code RateLimiterStrategy} is a convenience class that wraps 
initialization of various kinds
+ * of {@link RateLimiter}s.
+ */
+@Experimental
+public interface RateLimiterStrategy extends Serializable {
+
+    /**
+     * Creates a {@link RateLimiter} that lets records through with rate 
proportional to the
+     * parallelism. When N=parallelism rate limiters are utilized, their 
cumulative rate will not
+     * exceed the rate limit configured for the strategy.

Review Comment:
   ```suggestion
        * parallelism. This method will be called once per source subtask.The 
cumulative rate over all rate limiters for a source must not exceed the rate 
limit configured for the strategy.
   ```



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.datagen;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** TODO: remove after consensus in review. */

Review Comment:
   Do you still want to remove example?



##########
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for instantiating source readers that produce elements by 
applying a user-supplied
+ * {@link GeneratorFunction}.
+ *
+ * @param <OUT> The type of the output elements.
+ */
+public class GeneratorSourceReaderFactory<OUT>

Review Comment:
   lets explicitly mark this as `@Internal`



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/NoOpRateLimiter.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/** A convenience implementation of {@link RateLimiter} that does not throttle 
requests. */
+public class NoOpRateLimiter implements RateLimiter {
+
+    @Override
+    public CompletionStage<Void> acquire() {
+        return CompletableFuture.completedFuture(null);

Review Comment:
   ```suggestion
           return FutureUtils.completedVoidFuture();
   ```
   (a bit more efficient as we (probably?) don't need to create a separate 
object every time)



##########
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * <p>The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * <p>Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource<String> stream =
+ *         env.fromSource(source,
+ *         WatermarkStrategy.noWatermarks(),
+ *         "Generator Source");
+ * }</pre>
+ *
+ * <p>The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * <p>Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * <p>This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, Long> generatorFunction = index -> index;
+ *
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(
+ *              generatorFunctionStateless,
+ *              Long.MAX_VALUE,
+ *              RateLimiterStrategy.perSecond(100),
+ *              Types.STRING);
+ * }</pre>
+ *
+ * <p>This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, the 
end bound is pretty
+ * far away.
+ */
+@Experimental
+public class DataGeneratorSource<OUT>
+        implements Source<OUT, NumberSequenceSplit, 
Collection<NumberSequenceSplit>>,
+                ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SourceReaderFactory<OUT, NumberSequenceSplit> 
sourceReaderFactory;
+    private final TypeInformation<OUT> typeInfo;
+
+    private final NumberSequenceSource numberSource;
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            TypeInformation<OUT> typeInfo) {
+        this(generatorFunction, count, RateLimiterStrategy.noOp(), typeInfo);
+    }
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}.
+     *
+     * @param generatorFunction The {@code GeneratorFunction} function.
+     * @param count The number of generated data points.
+     * @param rateLimiterStrategy The strategy for rate limiting.
+     * @param typeInfo The type of the produced data points.
+     */
+    public DataGeneratorSource(
+            GeneratorFunction<Long, OUT> generatorFunction,
+            long count,
+            RateLimiterStrategy rateLimiterStrategy,
+            TypeInformation<OUT> typeInfo) {
+        this(
+                new GeneratorSourceReaderFactory<>(generatorFunction, 
rateLimiterStrategy),
+                count,
+                typeInfo);
+        ClosureCleaner.clean(
+                generatorFunction, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+    }
+
+    /**
+     * Instantiates a new {@code DataGeneratorSource}. This constructor allows 
users can take

Review Comment:
   slightly outdated since its no longer exposed to users.



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@code RateLimiterStrategy} is a convenience class that wraps 
initialization of various kinds
+ * of {@link RateLimiter}s.
+ */
+@Experimental
+public interface RateLimiterStrategy extends Serializable {
+
+    /**
+     * Creates a {@link RateLimiter} that lets records through with rate 
proportional to the
+     * parallelism. When N=parallelism rate limiters are utilized, their 
cumulative rate will not
+     * exceed the rate limit configured for the strategy.
+     */
+    RateLimiter createRateLimiter(int parallelism);
+
+    /** Creates a {@code RateLimiterStrategy} that is limiting the number of 
records per second. */

Review Comment:
   Can we add a similar as below about the rate/parallelism ratio and rounding?



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java:
##########
@@ -52,128 +40,19 @@
 @Public
 public class IteratorSourceReader<
                 E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
-        implements SourceReader<E, SplitT> {
-
-    /** The context for this reader, to communicate with the enumerator. */
-    private final SourceReaderContext context;
-
-    /** The availability future. This reader is available as soon as a split 
is assigned. */
-    private CompletableFuture<Void> availability;
-
-    /**
-     * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
-     * non-null always together with the {@link #currentSplit} field.
-     */
-    @Nullable private IterT iterator;
-
-    /**
-     * The split whose data we return. Non-null after a split has been 
assigned. This field is null
-     * or non-null always together with the {@link #iterator} field.
-     */
-    @Nullable private SplitT currentSplit;
-
-    /** The remaining splits that were assigned but not yet processed. */
-    private final Queue<SplitT> remainingSplits;
-
-    private boolean noMoreSplits;
+        extends IteratorSourceReaderBase<E, E, IterT, SplitT> {
 
     public IteratorSourceReader(SourceReaderContext context) {
-        this.context = checkNotNull(context);
-        this.availability = new CompletableFuture<>();
-        this.remainingSplits = new ArrayDeque<>();
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public void start() {
-        // request a split if we don't have one
-        if (remainingSplits.isEmpty()) {
-            context.sendSplitRequest();
-        }
-    }
-
-    @Override
-    public InputStatus pollNext(ReaderOutput<E> output) {
-        if (iterator != null) {
-            if (iterator.hasNext()) {
-                output.collect(iterator.next());
-                return InputStatus.MORE_AVAILABLE;
-            } else {
-                finishSplit();
-            }
-        }
-
-        return tryMoveToNextSplit();
-    }
-
-    private void finishSplit() {
-        iterator = null;
-        currentSplit = null;
-
-        // request another split if no other is left
-        // we do this only here in the finishSplit part to avoid requesting a 
split
-        // whenever the reader is polled and doesn't currently have a split
-        if (remainingSplits.isEmpty() && !noMoreSplits) {
-            context.sendSplitRequest();
-        }
-    }
-
-    private InputStatus tryMoveToNextSplit() {
-        currentSplit = remainingSplits.poll();
-        if (currentSplit != null) {
-            iterator = currentSplit.getIterator();
-            return InputStatus.MORE_AVAILABLE;
-        } else if (noMoreSplits) {
-            return InputStatus.END_OF_INPUT;
-        } else {
-            // ensure we are not called in a loop by resetting the 
availability future
-            if (availability.isDone()) {
-                availability = new CompletableFuture<>();
-            }
-
-            return InputStatus.NOTHING_AVAILABLE;
-        }
+        super(context);
     }
 
     @Override
-    public CompletableFuture<Void> isAvailable() {
-        return availability;
-    }
+    protected void start(SourceReaderContext context) {}
 
-    @Override
-    public void addSplits(List<SplitT> splits) {
-        remainingSplits.addAll(splits);
-        // set availability so that pollNext is actually called
-        availability.complete(null);
-    }
-
-    @Override
-    public void notifyNoMoreSplits() {
-        noMoreSplits = true;
-        // set availability so that pollNext is actually called
-        availability.complete(null);
-    }
+    // ------------------------------------------------------------------------
 
     @Override
-    public List<SplitT> snapshotState(long checkpointId) {
-        if (currentSplit == null && remainingSplits.isEmpty()) {
-            return Collections.emptyList();
-        }
-
-        final ArrayList<SplitT> allSplits = new ArrayList<>(1 + 
remainingSplits.size());
-        if (iterator != null && iterator.hasNext()) {
-            assert currentSplit != null;
-
-            @SuppressWarnings("unchecked")
-            final SplitT inProgressSplit =
-                    (SplitT) currentSplit.getUpdatedSplitForIterator(iterator);
-            allSplits.add(inProgressSplit);
-        }
-        allSplits.addAll(remainingSplits);
-        return allSplits;
+    protected E convert(E value) {

Review Comment:
   Should this be really in here?



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/util/ratelimit/RateLimiterStrategy.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.util.ratelimit;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The {@code RateLimiterStrategy} is a convenience class that wraps 
initialization of various kinds
+ * of {@link RateLimiter}s.

Review Comment:
   It's not _really_ a _convenience_ class; that'd imply it's use is optional.
   
   ```suggestion
    * A factory for {@link RateLimiter RateLimiters} which apply rate-limiting 
to a source sub-task.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to