[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995657759


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   > Do we actually have such examples?
   
   I haven't seen one; but they may exist :sweat_smile: 
   
   > Is this something that could be compared with the example that Steven 
shared 
[apache/iceberg/flink/source/BoundedTestSource.java#L70](https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java#L70)
 where data is emitted while holding the checkpointLock ?
   
   Exactly. There they control the _exact_ contents of each checkpoint.
   
   > It seems, though, that it should not prevent the new Source from being 
used as a replacement for the BoundedTestSource mentioned above with such 
reasonable settings, what do you think?
   
   Yes-ish; provided that the checkpoint interval is large enough there is a 
reasonably high change that it won't be an issue.
   
   Can the new sources even block checkpointing? I guess the only way to do 
that is by emitting multiple values in `pollNext`. But I'm not sure if there 
even is a guarantee that pollNext is called at least once between checkpoints.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995726266


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   > I assume that snapshotState(), notifyCheckpointComplete() and pollNext() 
are all called in the same thread?
   
   Yes, that should be the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995787574


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   There does not appear to be any guarantee that `pollNext` will be called 
between checkpoints.
   
   With that in mind I'd be alright with the current approach. The new 
checkpointing REST API (FLINK-27101) could be a way for users to work around 
this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r995921557


##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderFactory.java:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import java.io.Serializable;
+
+/**
+ * A factory for creating source reader instances.
+ *
+ * @param  The type of the output elements.
+ */
+public interface SourceReaderFactory extends 
Serializable {

Review Comment:
   missing `@Public`; hopefully this fixes one of the japicmp violations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-15 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r996284438


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param  The type of events returned by the reader.
+ * @param  The type of the iterator that produces the events. This type 
exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param  The concrete type of the {@code IteratorSourceSplit} that 
creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+public abstract class IteratorSourceReaderBase<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+implements SourceReader {
+
+/** The context for this reader, to communicate with the enumerator. */
+private final SourceReaderContext context;
+
+/** The availability future. This reader is available as soon as a split 
is assigned. */
+private CompletableFuture availability;
+
+/**
+ * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
+ * non-null always together with the {@link #currentSplit} field.
+ */
+@Nullable private IterT iterator;
+
+/**
+ * The split whose data we return. Non-null after a split has been 
assigned. This field is null
+ * or non-null always together with the {@link #iterator} field.
+ */
+@Nullable private SplitT currentSplit;
+
+/** The remaining splits that were assigned but not yet processed. */
+private final Queue remainingSplits;
+
+private boolean noMoreSplits;
+
+public IteratorSourceReaderBase(SourceReaderContext context) {
+this.context = checkNotNull(context);
+this.availability = new CompletableFuture<>();
+this.remainingSplits = new ArrayDeque<>();
+}
+
+// 
+
+@Override
+public final void start() {

Review Comment:
   I will remove the `final` flag because it's not source-compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-15 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r996290206


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   There's an `ExternallyInducedSourceReader` that can control when checkpoints 
occur.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-15 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r996303005


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   I'm not sure yet. But we have some time before the merge anyhow due to the 
japicmp issue.
   
   One issue is that controlling checkpointing should be optional, but once you 
extend the `ExternallyInducedSourceReader` it's always enabled. So we can't 
just add it to the `RateLimitedSourceReader`, but rather have to wrap it based 
on some condition.
   
   Beyond that, I'm wondering if this should be part of the `RateLimiter` 
(which then be more of a `RateController`) or a separate thing that provides a 
lower bound while the rate limiter provides an upper bound (with the problem 
that if they conflict weird things will happen).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-15 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r996303463


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   Urgh but an `ExternallyInducedSourceReader` has to provide the checkpoint 
IDs so that'd need checkpointing...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-15 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r996303463


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   Urgh but an `ExternallyInducedSourceReader` has to provide the checkpoint 
IDs so that'd need checkpointing, so we'd also need a wrapper around the 
NumberSequenceSplits...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-15 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r996305134


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   The `ExternallyInducedSourceReader` route would add so much complexity that 
I'm not sure if we should pursue it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-15 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r996303463


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   Urgh but an `ExternallyInducedSourceReader` has to provide the checkpoint 
IDs which we'd have to checkpoint, so we'd also need a wrapper around the 
NumberSequenceSplits...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-17 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r997215784


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param  The type of events returned by the reader.
+ * @param  The type of the iterator that produces the events. This type 
exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param  The concrete type of the {@code IteratorSourceSplit} that 
creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+public abstract class IteratorSourceReaderBase<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+implements SourceReader {
+
+/** The context for this reader, to communicate with the enumerator. */
+private final SourceReaderContext context;
+
+/** The availability future. This reader is available as soon as a split 
is assigned. */
+private CompletableFuture availability;
+
+/**
+ * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
+ * non-null always together with the {@link #currentSplit} field.
+ */
+@Nullable private IterT iterator;
+
+/**
+ * The split whose data we return. Non-null after a split has been 
assigned. This field is null
+ * or non-null always together with the {@link #iterator} field.
+ */
+@Nullable private SplitT currentSplit;
+
+/** The remaining splits that were assigned but not yet processed. */
+private final Queue remainingSplits;
+
+private boolean noMoreSplits;
+
+public IteratorSourceReaderBase(SourceReaderContext context) {
+this.context = checkNotNull(context);
+this.availability = new CompletableFuture<>();
+this.remainingSplits = new ArrayDeque<>();
+}
+
+// 
+
+@Override
+public final void start() {

Review Comment:
   This not being detected is FYI another bug that we found.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-18 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r997919542


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param  The type of events returned by the reader.
+ * @param  The type of the iterator that produces the events. This type 
exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param  The concrete type of the {@code IteratorSourceSplit} that 
creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+public abstract class IteratorSourceReaderBase<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+implements SourceReader {
+
+/** The context for this reader, to communicate with the enumerator. */
+private final SourceReaderContext context;
+
+/** The availability future. This reader is available as soon as a split 
is assigned. */
+private CompletableFuture availability;
+
+/**
+ * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
+ * non-null always together with the {@link #currentSplit} field.
+ */
+@Nullable private IterT iterator;
+
+/**
+ * The split whose data we return. Non-null after a split has been 
assigned. This field is null
+ * or non-null always together with the {@link #iterator} field.
+ */
+@Nullable private SplitT currentSplit;
+
+/** The remaining splits that were assigned but not yet processed. */
+private final Queue remainingSplits;
+
+private boolean noMoreSplits;
+
+public IteratorSourceReaderBase(SourceReaderContext context) {
+this.context = checkNotNull(context);
+this.availability = new CompletableFuture<>();
+this.remainingSplits = new ArrayDeque<>();
+}
+
+// 
+
+@Override
+public final void start() {

Review Comment:
   No :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963448684


##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##
@@ -63,4 +63,11 @@ public interface SourceReaderContext {
  * @see UserCodeClassLoader
  */
 UserCodeClassLoader getUserCodeClassLoader();
+
+/**
+ * Get the current parallelism of this Source.
+ *
+ * @return the parallelism of the Source.
+ */
+int currentParallelism();

Review Comment:
   This breaks source compatibility. Either justify an exclusion or add a 
default implementation.



##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by maki

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963611477


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GuavaRateLimiter.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
+public class GuavaRateLimiter

Review Comment:
   You could add a new open() method to the `GuavaFlinkConnectorRateLimiter` 
that accepts an int, and add a default implementation to the existing open() 
method that calls `open(runtimeContext#getParallelism())`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963481476


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratorSourceReaderFactory.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.GeneratorFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.SourceReaderFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for instantiating source readers that produce elements by 
applying a user-supplied
+ * {@link GeneratorFunction}. This implementation also implicitly supports 
throttling the data rate
+ * by using a default rate limiter.
+ *
+ * @param  The type of the output elements.
+ */
+public class GeneratorSourceReaderFactory
+implements SourceReaderFactory {
+
+private final GeneratorFunction generatorFunction;
+private final double sourceRatePerSecond;
+
+/**
+ * Instantiates a new {@code GeneratorSourceReaderFactory}.
+ *
+ * @param generatorFunction The generator function.
+ * @param sourceRatePerSecond The target source rate per second. This 
parameter specifies the
+ * overall source rate (across all source subtasks) and does not need 
to account for the
+ * parallelism.

Review Comment:
   should mention that rates <= 0 are disable rate-limiting.
   
   I don't quite get what `does not need to account for the parallelism` is 
meant to convey (just that it is a global rate and users dont have to calculate 
a per-subtask rate?).
   
   I'd also rename it to maxSourceRatePerSecond, because technically there's no 
guarantee that all sources reach the target rate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963654710


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963666133


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963729568


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Wraps the actual {@link SourceReader} and rate limits its data emission. */
+@Experimental
+public class RateLimitedSourceReader
+implements SourceReader {
+
+private final SourceReader sourceReader;
+private final RateLimiter rateLimiter;
+
+/**
+ * Instantiates a new rate-limited source reader.
+ *
+ * @param sourceReader The actual source reader.
+ * @param rateLimiter The rate limiter.
+ */
+public RateLimitedSourceReader(SourceReader sourceReader, 
RateLimiter rateLimiter) {
+checkNotNull(sourceReader);
+checkNotNull(rateLimiter);
+this.sourceReader = sourceReader;
+this.rateLimiter = rateLimiter;
+}
+
+// 
+
+@Override
+public void start() {
+sourceReader.start();
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+rateLimiter.acquire();

Review Comment:
   Or we could just implement the rate-limiting in a non-blocking fashion.
   
   Add a (single-threaded) Executor to the rate limiter that does the acquire 
call against guava, and return a future to the source reader that is completed 
once the permits were acquired.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963736928


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Wraps the actual {@link SourceReader} and rate limits its data emission. */
+@Experimental
+public class RateLimitedSourceReader
+implements SourceReader {
+
+private final SourceReader sourceReader;
+private final RateLimiter rateLimiter;
+
+/**
+ * Instantiates a new rate-limited source reader.
+ *
+ * @param sourceReader The actual source reader.
+ * @param rateLimiter The rate limiter.
+ */
+public RateLimitedSourceReader(SourceReader sourceReader, 
RateLimiter rateLimiter) {
+checkNotNull(sourceReader);
+checkNotNull(rateLimiter);
+this.sourceReader = sourceReader;
+this.rateLimiter = rateLimiter;
+}
+
+// 
+
+@Override
+public void start() {
+sourceReader.start();
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+rateLimiter.acquire();

Review Comment:
   I can try to sketch this out tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963830401


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963830401


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963832198


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963836339


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/SourceReaderFactory.java:
##
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.io.Serializable;
+
+/**
+ * A factory for creating source reader instances.
+ *
+ * @param  The type of the output elements.
+ */
+public interface SourceReaderFactory extends 
Serializable {

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r964996882


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Wraps the actual {@link SourceReader} and rate limits its data emission. */
+@Experimental
+public class RateLimitedSourceReader
+implements SourceReader {
+
+private final SourceReader sourceReader;
+private final RateLimiter rateLimiter;
+
+/**
+ * Instantiates a new rate-limited source reader.
+ *
+ * @param sourceReader The actual source reader.
+ * @param rateLimiter The rate limiter.
+ */
+public RateLimitedSourceReader(SourceReader sourceReader, 
RateLimiter rateLimiter) {
+checkNotNull(sourceReader);
+checkNotNull(rateLimiter);
+this.sourceReader = sourceReader;
+this.rateLimiter = rateLimiter;
+}
+
+// 
+
+@Override
+public void start() {
+sourceReader.start();
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+rateLimiter.acquire();

Review Comment:
   See 
https://github.com/zentol/flink/commit/0c2b75317d3be15ab502ae39b600f494422e3474



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963472820


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratingIteratorSourceReader.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.GeneratorFunction;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Iterator;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code SourceReader} that takes the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}, and applies a {@link GeneratorFunction} to them to 
perform arbitrary
+ * transformations.
+ */
+@Experimental
+public class GeneratingIteratorSourceReader<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+extends IteratorSourceReaderBase {
+
+private final GeneratorFunction generatorFunction;
+
+public GeneratingIteratorSourceReader(
+SourceReaderContext context, GeneratorFunction 
generatorFunction) {
+super(context);
+this.generatorFunction = checkNotNull(generatorFunction);
+}
+
+// 
+
+@Override
+public InputStatus pollNext(ReaderOutput output) {
+if (iterator != null) {
+if (iterator.hasNext()) {
+E next = iterator.next();
+try {
+O mapped = generatorFunction.map(next);
+output.collect(mapped);
+} catch (Exception e) {
+String message =
+String.format(
+"A user-provided generator function threw 
an exception on this input: %s",
+next.toString());
+throw new FlinkRuntimeException(message, e);
+}
+return InputStatus.MORE_AVAILABLE;

Review Comment:
   maybe add a comment similar to the one in the SourceReaderBase that this is 
technically incorrect (because we don't actually know that that there is 
another record) but cheaper than the alternative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r964999081


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/SourceReaderFactory.java:
##
@@ -27,9 +27,17 @@
 /**
  * A factory for creating source reader instances.
  *
- * @param  The type of the output elements.
+ * @param  The type of the output elements.
  */
-public interface SourceReaderFactory extends 
Serializable {
-/** Instantiates a new {@code SourceReader} using the provided context. */
-SourceReader newSourceReader(SourceReaderContext 
readerContext);
+public interface SourceReaderFactory extends 
Serializable {

Review Comment:
   Should be moved to the same package as the `Source` interface imo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963654710


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r965003553


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r965008089


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-08 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r965791919


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-08 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r965797922


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Wraps the actual {@link SourceReader} and rate limits its data emission. */
+@Experimental
+public class RateLimitedSourceReader
+implements SourceReader {
+
+private final SourceReader sourceReader;
+private final RateLimiter rateLimiter;
+
+/**
+ * Instantiates a new rate-limited source reader.
+ *
+ * @param sourceReader The actual source reader.
+ * @param rateLimiter The rate limiter.
+ */
+public RateLimitedSourceReader(SourceReader sourceReader, 
RateLimiter rateLimiter) {
+checkNotNull(sourceReader);
+checkNotNull(rateLimiter);
+this.sourceReader = sourceReader;
+this.rateLimiter = rateLimiter;
+}
+
+// 
+
+@Override
+public void start() {
+sourceReader.start();
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+rateLimiter.acquire();

Review Comment:
   > Can't we use tryAcquire() for such implementation directly?
   
   No, that would imply hot-looping if there if we have currently hit the rate 
limit.
   
   > The call to sourceReader.pollNext(output) in the sketch is unrestricted
   
   That's not quite true. The contract of the `SourceReader`, as I understand 
it, is that if you return `NOTHING_AVAILABLE` then `isAvailable()` is called, 
and `pollNext()` is only called again once the returned future is complete.
   By never returning `MORE_AVAILABLE` `pollNext()` should never be called 
multiple times in a row.
   
   > The underlying sourceReader is going to keep internally calling 
output.collect(iterator.next()) without any limitations
   
   This is also an issue in the current PR. SourceReaders are also encouraged, 
per the javadocs, to not emit multiple records within a single `pollNext()` 
call.
   
   >  Do I get it wrong?
   
   Yes, I think so. There's no busy waiting because we either a) wait for the 
future to complete or b) poll values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-08 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r965797922


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java:
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Wraps the actual {@link SourceReader} and rate limits its data emission. */
+@Experimental
+public class RateLimitedSourceReader
+implements SourceReader {
+
+private final SourceReader sourceReader;
+private final RateLimiter rateLimiter;
+
+/**
+ * Instantiates a new rate-limited source reader.
+ *
+ * @param sourceReader The actual source reader.
+ * @param rateLimiter The rate limiter.
+ */
+public RateLimitedSourceReader(SourceReader sourceReader, 
RateLimiter rateLimiter) {
+checkNotNull(sourceReader);
+checkNotNull(rateLimiter);
+this.sourceReader = sourceReader;
+this.rateLimiter = rateLimiter;
+}
+
+// 
+
+@Override
+public void start() {
+sourceReader.start();
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+rateLimiter.acquire();

Review Comment:
   > Can't we use tryAcquire() for such implementation directly?
   
   No, that would imply hot-looping if we have currently hit the rate limit.
   
   > The call to sourceReader.pollNext(output) in the sketch is unrestricted
   
   That's not quite true. The contract of the `SourceReader`, as I understand 
it, is that if you return `NOTHING_AVAILABLE` then `isAvailable()` is called, 
and `pollNext()` is only called again once the returned future is complete.
   By never returning `MORE_AVAILABLE` `pollNext()` should never be called 
multiple times in a row.
   
   > The underlying sourceReader is going to keep internally calling 
output.collect(iterator.next()) without any limitations
   
   This is also an issue in the current PR. SourceReaders are also encouraged, 
per the javadocs, to not emit multiple records within a single `pollNext()` 
call.
   
   >  Do I get it wrong?
   
   Yes, I think so. There's no busy waiting because we either a) wait for the 
future to complete or b) poll values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r966785231


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GeneratorSourceReaderFactory.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.GeneratorFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.SourceReaderFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory for instantiating source readers that produce elements by 
applying a user-supplied
+ * {@link GeneratorFunction}. This implementation also implicitly supports 
throttling the data rate
+ * by using a default rate limiter.

Review Comment:
   "default rate limiter" also sounds a bit strange; it reads like it always 
throttles data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r966808144


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param  The type of events returned by the reader.
+ * @param  The type of the iterator that produces the events. This type 
exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param  The concrete type of the {@code IteratorSourceSplit} that 
creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+abstract class IteratorSourceReaderBase<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+implements SourceReader {
+
+/** The context for this reader, to communicate with the enumerator. */
+protected final SourceReaderContext context;
+
+/** The availability future. This reader is available as soon as a split 
is assigned. */
+protected CompletableFuture availability;
+
+/**
+ * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
+ * non-null always together with the {@link #currentSplit} field.
+ */
+@Nullable protected IterT iterator;

Review Comment:
   > Is this more about having methods like getIterator() defined in another 
interface rather than exposing protected fields directly?
   
   Not really. That's just another flavor of the same problem. (Probably a 
_better_ flavor, but still)
   
   > This is very similar to the design of the SourceReaderBase that is also 
part of the new Sink API. 
   
   I dislike that as well :)
   
   > Is not SourceReader such a well-defined interface?
   
   The SourceReader is an interface for what a `SourceReader` does. It has no 
bearing on what behavior a _sub-class of the IteratorSourceReaderBase_ should 
be able to customize.
   
   > Could you give a sketch
   
   Yes! 
https://github.com/zentol/flink/commit/aa071987c763f05420f8a0832bb556eeb46b8c6f
   
   I hope this sketch also highlights why I dislike having so much exposed to 
sub-classes; you easily end up mixing responsibilities. There's no good reason 
for why the entire `pollNext()` routine was handle be the sub-class.
   
   Beyond that, here are questions I thought about when looking at it:
   * Why should a sub class have control over the iterator?
   * Why should it be able to ignore elements, or entire splits?
   * Why should it be able to override the `availability` future (bricking the 
source when don't at the wrong time)?
   
   You end up short-cutting any thought about "what custom behavior might 
someone want and how can we support that _safely_", and in the process end up 
adding a whole bunch of traps and potential for contracts to be broken.
   
   Note that in the sketch the sub-class ends up being a bit pointless; as we'd 
be better of just merging the 2 classes at this time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r966808144


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param  The type of events returned by the reader.
+ * @param  The type of the iterator that produces the events. This type 
exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param  The concrete type of the {@code IteratorSourceSplit} that 
creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+abstract class IteratorSourceReaderBase<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+implements SourceReader {
+
+/** The context for this reader, to communicate with the enumerator. */
+protected final SourceReaderContext context;
+
+/** The availability future. This reader is available as soon as a split 
is assigned. */
+protected CompletableFuture availability;
+
+/**
+ * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
+ * non-null always together with the {@link #currentSplit} field.
+ */
+@Nullable protected IterT iterator;

Review Comment:
   > Is this more about having methods like getIterator() defined in another 
interface rather than exposing protected fields directly?
   
   Not really. That's just another flavor of the same problem. (Probably a 
_better_ flavor, but still)
   
   > This is very similar to the design of the SourceReaderBase that is also 
part of the new Sink API. 
   
   I dislike that as well :)
   
   > Is not SourceReader such a well-defined interface?
   
   The SourceReader is an interface for what a `SourceReader` does. It has no 
bearing on what behavior a _sub-class of the IteratorSourceReaderBase_ should 
be able to customize.
   
   > Could you give a sketch
   
   Yes! 
https://github.com/zentol/flink/commit/[aa071987c763f05420f8a0832bb556eeb46b8c6f](https://github.com/apache/flink/pull/20757#discussion_r966808144)
   
   I hope this sketch also highlights why I dislike having so much exposed to 
sub-classes; you easily end up mixing responsibilities. There's no good reason 
for why the entire `pollNext()` routine was handle be the sub-class.
   
   Beyond that, here are questions I thought about when looking at it:
   * Why should a sub class have control over the iterator?
   * Why should it be able to ignore elements, or entire splits?
   * Why should it be able to override the `availability` future (bricking the 
source when don't at the wrong time)?
   
   You end up short-cutting any thought about "what custom behavior might 
someone want and how can we support that _safely_", and in the process end up 
adding a whole bunch of traps and potential for contracts to be broken.
   
   Note that in the sketch the sub-class ends up being a bit pointless; as we'd 
be better of just merging the 2 classes at this time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apach

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r966808144


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param  The type of events returned by the reader.
+ * @param  The type of the iterator that produces the events. This type 
exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param  The concrete type of the {@code IteratorSourceSplit} that 
creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+abstract class IteratorSourceReaderBase<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+implements SourceReader {
+
+/** The context for this reader, to communicate with the enumerator. */
+protected final SourceReaderContext context;
+
+/** The availability future. This reader is available as soon as a split 
is assigned. */
+protected CompletableFuture availability;
+
+/**
+ * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
+ * non-null always together with the {@link #currentSplit} field.
+ */
+@Nullable protected IterT iterator;

Review Comment:
   > Is this more about having methods like getIterator() defined in another 
interface rather than exposing protected fields directly?
   
   Not really. That's just another flavor of the same problem. (Probably a 
_better_ flavor, but still)
   
   > This is very similar to the design of the SourceReaderBase that is also 
part of the new Sink API. 
   
   I dislike that as well :)
   
   > Is not SourceReader such a well-defined interface?
   
   The SourceReader is an interface for what a `SourceReader` does. It has no 
bearing on what behavior a _sub-class of the IteratorSourceReaderBase_ should 
be able to customize.
   
   > Could you give a sketch
   
   Yes! 
https://github.com/zentol/flink/commit/9ceaab5b572c5d91c1b80b2773cce0bd0574b483
   
   I hope this sketch also highlights why I dislike having so much exposed to 
sub-classes; you easily end up mixing responsibilities. There's no good reason 
for why the entire `pollNext()` routine was handle be the sub-class.
   
   Beyond that, here are questions I thought about when looking at it:
   * Why should a sub class have control over the iterator?
   * Why should it be able to ignore elements, or entire splits?
   * Why should it be able to override the `availability` future (bricking the 
source when don't at the wrong time)?
   
   You end up short-cutting any thought about "what custom behavior might 
someone want and how can we support that _safely_", and in the process end up 
adding a whole bunch of traps and potential for contracts to be broken.
   
   Note that in the sketch the sub-class ends up being a bit pointless; as we'd 
be better of just merging the 2 classes at this time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r966821506


##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##
@@ -63,4 +63,11 @@ public interface SourceReaderContext {
  * @see UserCodeClassLoader
  */
 UserCodeClassLoader getUserCodeClassLoader();
+
+/**
+ * Get the current parallelism of this Source.
+ *
+ * @return the parallelism of the Source.
+ */
+int currentParallelism();

Review Comment:
   I'm not sure.
   
   It seems unlikely that there are implementations of this class, and if there 
are they should be notified about this additional method in case and reader 
they use was updated to already rely on it.
   
   While adding a default method is neat in terms of source compatibility, you 
either:
   a) return some static count, which might break behavior (maybe right away, 
maybe at some point in the future)
   b) thrown an exception, which would make it technically source compatible 
(urgh) but not in practice.
   
   I'm just annoyed that we yet again failed to wrap all this super common 
metadata (parallelism, attempt number etc etc) into a common structure that can 
be shared across the reader context and runtime context, and now end up having 
to copy things over one getter at a time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r967001455


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;

Review Comment:
   I think that would be better. Moving the remaining classes is an orthogonal 
thing we can take separately. (which'd be a 2 step-process, first moving the 
classes and leaving behind a sort of proxy `Source extends 
org.apache.flink...Source` that is deprecated), that we then remove whenever we 
see fit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r967001455


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;

Review Comment:
   I think that would be better. Moving the remaining classes is an orthogonal 
thing we can take separately. (which'd be a 2 step-process, first moving the 
classes and leaving behind a sort of proxy `Source extends 
org.apache.flink...Source` that is deprecated, that we then remove whenever we 
see fit.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r967017774


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java:
##
@@ -22,8 +22,14 @@
 import java.util.concurrent.CompletionStage;
 
 /** The interface to rate limit execution of methods. */
-interface RateLimiter extends Serializable {
+public interface RateLimiter extends Serializable {
 
 /** Returns a future that is completed once another event would not exceed 
the rate limit. */
 CompletionStage acquire();
+
+/**
+ * Can be used to modify rate-limiter's behaviour from the outside. Makes 
it possible to
+ * implement rate limiters based on external events rather than on time.
+ */
+default void notifyRelease() {}

Review Comment:
   Because of the name this method implies a tight coupling between the 
SourceReader and RateLimiter implementation.
   A generic SourceReader could not determine _what_ external event should 
actually trigger the release, e.g., the RateLimitedSourceReader has no business 
calling notifyRelease in notifyCheckpointComplete by default, because it has no 
idea whether the rate limiter expects that to occur per checkpoint.
   Meanwhile a generic rate limiter does not know whether it should react to a 
particular event or not. For example, why is the GuavaRateLimiter not resetting 
the current rate?
   
   There isn't a real contract here that either side can adhere to.
   You'd be better of naming it `notifyCheckpointComplete`; it makes it clear 
to the RateLimiter when this is called and to the SourceReader when it should 
be called, without implying any resulting behavior of the limiter.
   
   Anything more specific can be implemented by the user with a custom 
SourceReader and RateLimiter if required.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r967018774


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-09 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r967018774


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-12 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r968178154


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
+ * IteratorSourceSplit}.
+ *
+ * The {@code IteratorSourceSplit} is also responsible for taking the 
current iterator and
+ * turning it back into a split for checkpointing.
+ *
+ * @param  The type of events returned by the reader.
+ * @param  The type of the iterator that produces the events. This type 
exists to make the
+ * conversion between iterator and {@code IteratorSourceSplit} type safe.
+ * @param  The concrete type of the {@code IteratorSourceSplit} that 
creates and converts
+ * the iterator that produces this reader's elements.
+ */
+@Public
+abstract class IteratorSourceReaderBase<
+E, O, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
+implements SourceReader {
+
+/** The context for this reader, to communicate with the enumerator. */
+protected final SourceReaderContext context;
+
+/** The availability future. This reader is available as soon as a split 
is assigned. */
+protected CompletableFuture availability;
+
+/**
+ * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
+ * non-null always together with the {@link #currentSplit} field.
+ */
+@Nullable protected IterT iterator;

Review Comment:
   At first I thought we should merge the IteratorSourceReader and 
IteratorSourceReaderBase (because the IteratorSourceReader doesn't really 
contain anything), but this wouldn't work because we can't change the generics 
to allow a different output type than the input.
   
   So I'd leave it as is post-f1ed0ff14fb9cd79c910c3784aad109ab94e2d3c.



##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java:
##
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceReader} that returns the 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-12 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r968181727


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java:
##
@@ -22,8 +22,14 @@
 import java.util.concurrent.CompletionStage;
 
 /** The interface to rate limit execution of methods. */
-interface RateLimiter extends Serializable {
+public interface RateLimiter extends Serializable {
 
 /** Returns a future that is completed once another event would not exceed 
the rate limit. */
 CompletionStage acquire();
+
+/**
+ * Can be used to modify rate-limiter's behaviour from the outside. Makes 
it possible to
+ * implement rate limiters based on external events rather than on time.
+ */
+default void notifyRelease() {}

Review Comment:
   > Should we add CheckpointAwareRateLimiter implements RateLimiter to keep 
the RateLimiter more high-level?
   
   The idea is good but how'd that work in practice though?
   The RateLimitedSourceReader would have to accept a 
CheckpointAwareRateLimiter via the constructor which leaks upwards, unless we 
do a instanceof check + cast _somewhere_.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-13 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r969374348


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GuavaRateLimiter.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.RateLimiter;
+
+/** An implementation of {@link RateLimiter} based on Guava's RateLimiter. */
+public class GuavaRateLimiter

Review Comment:
   let's merge it as is and resolve the duplication in a follow-up.
   
   The FlinkConnectorRateLimiter interface is a bit of a problem; it is good 
that it allows multiple permits to be acquired at once for efficiency purposes, 
but it interacts oddly with rate limiting.
   For example, if your limit is 10 records per checkpoint, then you're never 
getting any permits if you request 11 at once.
   
   We'll need to think about how we can properly implement this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r970437840


##
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##
@@ -63,4 +63,11 @@ public interface SourceReaderContext {
  * @see UserCodeClassLoader
  */
 UserCodeClassLoader getUserCodeClassLoader();
+
+/**
+ * Get the current parallelism of this Source.
+ *
+ * @return the parallelism of the Source.
+ */
+int currentParallelism();

Review Comment:
   Let's go with 2).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r970440346


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-14 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r970445400


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   strange that this decrements by 1 instead of incrementing like the 
constructor.



##
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java:
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+public class DataGeneratorSourceITCase extends TestLogger {
+
+private static final int PARALLELISM = 4;
+
+@RegisterExtension
+private static final MiniClusterExtension miniClusterExtension =
+new MiniClusterExtension(
+new MiniClusterResourceConf

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-27 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980953322


##
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+@Test
+@DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+public void testRestoreEnumerator() throws Exception {
+final GeneratorFunction generatorFunctionStateless = index 
-> index;
+final DataGeneratorSource dataGeneratorSource =
+new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+final int parallelism = 2;
+final 
MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(parallelism);
+
+SplitEnumerator<
+NumberSequenceSource.NumberSequenceSplit,
+Collection>
+enumerator = dataGeneratorSource.createEnumerator(context);
+
+// start() is not strictly necessary in the current implementation, 
but should logically be
+// executed in this order (protect against any breaking changes in the 
start() method).
+enumerator.start();
+
+Collection enumeratorState =
+enumerator.snapshotState(0);
+
+@SuppressWarnings("unchecked")
+final Queue splits =
+(Queue)
+Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+assertThat(splits).hasSize(parallelism);
+
+enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+@SuppressWarnings("unchecked")
+final Queue restoredSplits =
+(Queue)
+Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+assertThat(restoredSplits).hasSize(enumeratorState.size());
+}
+
+@Test
+@DisplayName("Uses the underlying NumberSequenceSource correctly for 
checkpointing.")
+public void testReaderCheckpoints() throws Exception {
+final long from = 177;
+final long mid = 333;
+final long to = 563;
+final long elementsPerCycle = (to - from) / 3;
+
+final TestingReaderOutput out = new TestingReaderOutput<>();
+
+SourceReader reader = 
createReader();
+reader.addSplits(
+Arrays.asList(
+new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
+new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+long remainingInCycle = element

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-27 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980973383


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   I think it's a problem that the first cycle is somehow treated differently 
than the others; conceptually it should be _possible_ to just call 
notifyCheckpointComplete immediately (e.g., in the constructor) without it 
affecting the behavior.
   
   As I understood the reason we need this distinction is that we decrement the 
count _before_ the future completes, causing us to smuggle some decrements over 
into the next cycle.
   
   I'm currently experimenting to avoid that problem (and others incidentally):
   
   ```
   CompletableFuture gatingFuture = 
CompletableFuture.completedFuture(null);
   
   @Override
   public CompletionStage acquire() {
   if (capacityLeft <= 0) {
   gatingFuture = new CompletableFuture<>();
   }
   return gatingFuture.thenRun(() -> capacityLeft -= 1);
   }
   
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
   capacityLeft = capacityPerCycle;
   gatingFuture.complete(null);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-27 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980973383


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   I think it's a problem that the first cycle is somehow treated differently 
than the others; conceptually it should be _possible_ to just call 
notifyCheckpointComplete immediately (e.g., in the constructor) without it 
affecting the behavior.
   
   AFAICT the reason we need this distinction is that we decrement the count 
_before_ the future completes, causing us to smuggle some decrements over.
   
   I'm currently experimenting to avoid that problem (and others incidentally):
   
   ```
   CompletableFuture gatingFuture = 
CompletableFuture.completedFuture(null);
   
   @Override
   public CompletionStage acquire() {
   if (capacityLeft <= 0) {
   gatingFuture = new CompletableFuture<>();
   }
   return gatingFuture.thenRun(() -> capacityLeft -= 1);
   }
   
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
   capacityLeft = capacityPerCycle;
   gatingFuture.complete(null);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-27 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980973383


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   I think it's a problem that the first cycle is somehow treated differently 
than the others; conceptually it should be _possible_ to just call 
notifyCheckpointComplete immediately (e.g., in the constructor) without it 
affecting the behavior.
   
   As I understood the reason we need this distinction is that we decrement the 
count _before_ the future completes, causing us to smuggle some decrements over 
into the next cycle.
   
   I'm currently experimenting to avoid that problem (and others incidentally):
   
   ```
   public GatedRateLimiter(int capacityPerCycle) {
   this.capacityPerCycle = capacityPerCycle;
   this.capacityLeft = capacityPerCycle;
   }
   
   CompletableFuture gatingFuture = 
CompletableFuture.completedFuture(null);
   
   @Override
   public CompletionStage acquire() {
   if (capacityLeft <= 0) {
   gatingFuture = new CompletableFuture<>();
   }
   return gatingFuture.thenRun(() -> capacityLeft -= 1);
   }
   
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
   capacityLeft = capacityPerCycle;
   gatingFuture.complete(null);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-27 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980973383


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   I think it's a problem that the first cycle is somehow treated differently 
than the others; conceptually it should be _possible_ to just call 
notifyCheckpointComplete immediately (e.g., in the constructor) without it 
affecting the behavior.
   
   As I understood the reason we need this distinction is that we decrement the 
count _before_ the future completes, causing us to smuggle some decrements over 
into the next cycle.
   
   I'm currently experimenting to avoid that problem (and others incidentally):
   
   ```
   public GatedRateLimiter(int capacityPerCycle) {
   this.capacityPerCycle = capacityPerCycle;
   this.capacityLeft = capacityPerCycle;
   }
   
   transient CompletableFuture gatingFuture = null;
   
   @Override
   public CompletionStage acquire() {
   if (gatingFuture == null) {
   gatingFuture = CompletableFuture.completedFuture(null);
   }
   if (capacityLeft <= 0) {
   gatingFuture = new CompletableFuture<>();
   }
   return gatingFuture.thenRun(() -> capacityLeft -= 1);
   }
   
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
   capacityLeft = capacityPerCycle;
   gatingFuture.complete(null);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-27 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980975212


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimiter.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.io.Serializable;
+import java.util.concurrent.CompletionStage;
+
+/** The interface to rate limit execution of methods. */
+public interface RateLimiter extends Serializable {
+
+/** Returns a future that is completed once another event would not exceed 
the rate limit. */

Review Comment:
   We should clarify that acquire must not be called again until the returned 
future completes, as otherwise we can go budget.



##
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {

Review Comment:
   ```suggestion
   class DataGeneratorSourceTest {
   ```
   Remove public modifier from classes and methods.



##
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * Se

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-27 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r980977989


##
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReaderITCase.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** An integration test for rate limiting built into the DataGeneratorSource. 
*/
+public class RateLimitedSourceReaderITCase extends TestLogger {

Review Comment:
   We should also have some unit tests for the rate limiter imo. At the very 
least something like below (which FYI; fails with the current version in the PR 
(works with mine though :see_no_evil: )).
   
   ```
   class GatedRateLimiterTest {
   @Test
   void testCapacityNotExceededOnCheckpoint() {
   int capacityPerCycle = 5;
   
   final GatedRateLimiter gatedRateLimiter = new 
GatedRateLimiter(capacityPerCycle);
   for (int x = 0; x < capacityPerCycle; x++) {
   assertThat(gatedRateLimiter.acquire()).isCompleted();
   }
   
   CompletionStage postInitialBatch = gatedRateLimiter.acquire();
   assertThat(postInitialBatch).isNotCompleted();
   
   gatedRateLimiter.notifyCheckpointComplete(0);
   
   assertThat(postInitialBatch).isCompleted();
   for (int x = 0; x < capacityPerCycle - 1; x++) {
   assertThat(gatedRateLimiter.acquire()).isCompleted();
   }
   
   CompletionStage postCheckpoint = gatedRateLimiter.acquire();
   assertThat(postCheckpoint).isNotCompleted();
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-09-28 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r982171401


##
flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceITCase.java:
##
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.lib.util.GatedRateLimiter;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratingIteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.RateLimitedSourceReader;
+import org.apache.flink.api.connector.source.lib.util.RateLimiter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import static java.util.stream.Collectors.summingInt;
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** An integration test for {@code DataGeneratorSource}. */
+public class DataGeneratorSourceITCase extends TestLogger {
+
+private static final int PARALLELISM = 4;
+
+@RegisterExtension
+private static final MiniClusterExtension miniClusterExtension =
+new MiniClusterExtension(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(PARALLELISM)
+.build());
+
+// 
+
+@Test
+@DisplayName("Combined results of parallel source readers produce the 
expected sequence.")
+public void testParallelSourceExecution() throws Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(PARALLELISM);
+
+final DataStream stream = getGeneratorSourceStream(index -> 
index, env, 1_000L);
+
+final List result = stream.executeAndCollect(1);
+
+assertThat(result).containsExactlyInAnyOrderElementsOf(range(0, 999));
+}
+
+@Test
+@DisplayName("Generator function can be instantiated as an anonymous 
class.")
+public void testParallelSourceExecutionWithAnonymousClass() throws 
Exception {
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(PARALLELISM);
+
+GeneratorFunction generatorFunction =
+new GeneratorFunction() {
+
+@Override
+public Long map(Long value) {
+return value;
+}
+};
+
+final DataStream stream = 
getGeneratorSourceStream(generatorFunction, env, 1_000L);
+
+final List result = stream.executeAndCollect(1);
+
+assertThat(result).containsExactlyInAnyOrderElementsOf(ra

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988967326


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   Without having played around with the `DataGeneratorPerCheckpoint` it 
doesn't quite make sense to me. Why would we only emit 9 values in the first 
cycle? It doesn't match the results from the unit test I proposed below, that 
shows that my version emits exactly `capacity` values per cycle.



##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = ca

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988979454


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   While debugging I noticed that acquire() is called twice initially without 
the first value being printed. This could indicate a bug in the source 
somewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988979454


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   Interesting, it does behave as you describe.
   
   While debugging I noticed that acquire() is called twice initially without 
the first value being printed. This could indicate a bug in the source 
somewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988984499


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   The problem is that `IteratorSourceReaderBase#pollNext` does not emit a 
value if `iterator == null` and we moved to the next split. However the we null 
the `availabilityFuture` in `RateLImitedSourceReader#pollNext` anyway.
   
   So we're just blow 1 capacity in the first cycle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988986838


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   amend `IteratorSourceReaderBase#pollNext` and follows to fix it:
   
   ```
   -return tryMoveToNextSplit();
   +final InputStatus inputStatus = tryMoveToNextSplit();
   +if (inputStatus == InputStatus.MORE_AVAILABLE) {
   +output.collect(convert(iterator.next()));
   +}
   +return inputStatus;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988984499


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   The problem is that `IteratorSourceReaderBase#pollNext` does not emit a 
value if `iterator == null` and we moved to the next split. However we still 
null the `availabilityFuture` in `RateLImitedSourceReader#pollNext` anyway.
   
   So we're just blow 1 capacity in the first cycle.



##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPer

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988986838


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   amend `IteratorSourceReaderBase#pollNext` as follows to fix it:
   
   ```
   -return tryMoveToNextSplit();
   +final InputStatus inputStatus = tryMoveToNextSplit();
   +if (inputStatus == InputStatus.MORE_AVAILABLE) {
   +output.collect(convert(iterator.next()));
   +}
   +return inputStatus;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-06 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r988989113


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/GatedRateLimiter.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+/**
+ * An implementation of {@link RateLimiter} that completes defined number of 
futures in-between the
+ * external notification events. The first cycle completes immediately, 
without waiting for the
+ * external notifications.
+ */
+public class GatedRateLimiter implements RateLimiter {
+
+private final int capacityPerCycle;
+private int capacityLeft;
+
+/**
+ * Instantiates a new GatedRateLimiter.
+ *
+ * @param capacityPerCycle The number of completed futures per cycle.
+ */
+public GatedRateLimiter(int capacityPerCycle) {
+this.capacityPerCycle = capacityPerCycle;
+this.capacityLeft = capacityPerCycle + 1;
+}
+
+CompletableFuture gatingFuture;
+
+@Override
+public CompletionStage acquire() {
+if (capacityLeft-- > 0) {
+return CompletableFuture.completedFuture(null);
+} else {
+if (gatingFuture == null) {
+gatingFuture = new CompletableFuture<>();
+}
+return gatingFuture;
+}
+}
+
+@Override
+public void notifyCheckpointComplete(long checkpointId) {
+capacityLeft = capacityPerCycle - 1;

Review Comment:
   I guess a similar issue would currently occur if the split is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r989923571


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSource.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.lib.util.GeneratorSourceReaderFactory;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunctionStateless, 
Long.MAX_VALUE, 100, Types.STRING);
+ * }
+ *
+ * For more sophisticates use cases, users can take full control of the 
low-level data generation
+ * details by supplying a custom {@code SourceReaderFactory}. The instantiated 
{@code SourceReader}s
+ * are expected to produce data based on processing {@code 
NumberSequenceSplit}s. A customized
+ * generator could, for instance, synchronize the data release process with 
checkpointing by making
+ * use of ({@link SourceReader#notifyCheckpointComplete(long)}). Such 
functionality could be
+ * helpful, for instance, for testing sinks that are expected to create 
specific metadata upon the
+ * arrival of a checkpoint barrier and other similar use cases.
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a streaming
+ * manner, because, despite the fact that the produced stream is bounded, 

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r989925818


##
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+@Test
+@DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+public void testRestoreEnumerator() throws Exception {
+final GeneratorFunction generatorFunctionStateless = index 
-> index;
+final DataGeneratorSource dataGeneratorSource =
+new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+final int parallelism = 2;
+final 
MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(parallelism);
+
+SplitEnumerator<
+NumberSequenceSource.NumberSequenceSplit,
+Collection>
+enumerator = dataGeneratorSource.createEnumerator(context);
+
+// start() is not strictly necessary in the current implementation, 
but should logically be
+// executed in this order (protect against any breaking changes in the 
start() method).
+enumerator.start();
+
+Collection enumeratorState =
+enumerator.snapshotState(0);
+
+@SuppressWarnings("unchecked")
+final Queue splits =
+(Queue)
+Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+assertThat(splits).hasSize(parallelism);
+
+enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+@SuppressWarnings("unchecked")
+final Queue restoredSplits =
+(Queue)
+Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+assertThat(restoredSplits).hasSize(enumeratorState.size());
+}
+
+@Test
+@DisplayName("Uses the underlying NumberSequenceSource correctly for 
checkpointing.")
+public void testReaderCheckpoints() throws Exception {
+final long from = 177;
+final long mid = 333;
+final long to = 563;
+final long elementsPerCycle = (to - from) / 3;
+
+final TestingReaderOutput out = new TestingReaderOutput<>();
+
+SourceReader reader = 
createReader();
+reader.addSplits(
+Arrays.asList(
+new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
+new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+long remainingInCycle = element

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-07 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r989925818


##
flink-core/src/test/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceTest.java:
##
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.datagen.DataGeneratorSource;
+import org.apache.flink.api.connector.source.datagen.GeneratorFunction;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link DataGeneratorSource}. */
+public class DataGeneratorSourceTest {
+
+@Test
+@DisplayName("Correctly restores SplitEnumerator from a snapshot.")
+public void testRestoreEnumerator() throws Exception {
+final GeneratorFunction generatorFunctionStateless = index 
-> index;
+final DataGeneratorSource dataGeneratorSource =
+new DataGeneratorSource<>(generatorFunctionStateless, 100, 
Types.LONG);
+
+final int parallelism = 2;
+final 
MockSplitEnumeratorContext context =
+new MockSplitEnumeratorContext<>(parallelism);
+
+SplitEnumerator<
+NumberSequenceSource.NumberSequenceSplit,
+Collection>
+enumerator = dataGeneratorSource.createEnumerator(context);
+
+// start() is not strictly necessary in the current implementation, 
but should logically be
+// executed in this order (protect against any breaking changes in the 
start() method).
+enumerator.start();
+
+Collection enumeratorState =
+enumerator.snapshotState(0);
+
+@SuppressWarnings("unchecked")
+final Queue splits =
+(Queue)
+Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+assertThat(splits).hasSize(parallelism);
+
+enumerator = dataGeneratorSource.restoreEnumerator(context, 
enumeratorState);
+
+@SuppressWarnings("unchecked")
+final Queue restoredSplits =
+(Queue)
+Whitebox.getInternalState(enumerator, 
"remainingSplits");
+
+assertThat(restoredSplits).hasSize(enumeratorState.size());
+}
+
+@Test
+@DisplayName("Uses the underlying NumberSequenceSource correctly for 
checkpointing.")
+public void testReaderCheckpoints() throws Exception {
+final long from = 177;
+final long mid = 333;
+final long to = 563;
+final long elementsPerCycle = (to - from) / 3;
+
+final TestingReaderOutput out = new TestingReaderOutput<>();
+
+SourceReader reader = 
createReader();
+reader.addSplits(
+Arrays.asList(
+new 
NumberSequenceSource.NumberSequenceSplit("split-1", from, mid),
+new 
NumberSequenceSource.NumberSequenceSplit("split-2", mid + 1, to)));
+
+long remainingInCycle = element

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-10 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r991345494


##
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/datagen/DataGeneratorPerCheckpoint.java:
##
@@ -40,22 +35,19 @@ public static void main(String[] args) throws Exception {
 final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.enableCheckpointing(3000);
 env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-env.setParallelism(1);
+env.setParallelism(8);
 
 final String[] elements = new String[] {"a", "b", "c", "d", "e", "f", 
"g", "h", "i", "j"};
 final int size = elements.length;
 final GeneratorFunction generatorFunction =
 index -> elements[(int) (index % size)];
-final RateLimiter rateLimiter = new GatedRateLimiter(size);
-
-final SourceReaderFactory factory =
-context ->
-new RateLimitedSourceReader<>(
-new GeneratingIteratorSourceReader<>(context, 
generatorFunction),
-rateLimiter);
 
 final DataGeneratorSource generatorSource =
-new DataGeneratorSource<>(factory, Long.MAX_VALUE, 
Types.STRING);
+new DataGeneratorSource<>(
+generatorFunction,
+Long.MAX_VALUE,
+RateLimiterStrategy.perCheckpoint(4),

Review Comment:
   shouldn't this fail since its lower then the parallelism?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-10 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r991346364


##
flink-connectors/flink-connector-datagen/pom.xml:
##
@@ -0,0 +1,79 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-connectors
+   1.17-SNAPSHOT
+   
+
+   flink-connector-datagen
+   Flink : Connectors : Datagen
+
+   jar
+
+   

Review Comment:
   ```suggestion
   ```



##
flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/DataGeneratorSource.java:
##
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.datagen.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceReaderFactory;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import 
org.apache.flink.api.connector.source.lib.NumberSequenceSource.NumberSequenceSplit;
+import 
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A data source that produces N data points in parallel. This source is 
useful for testing and for
+ * cases that just need a stream of N events of any kind.
+ *
+ * The source splits the sequence into as many parallel sub-sequences as 
there are parallel
+ * source readers.
+ *
+ * Users can supply a {@code GeneratorFunction} for mapping the 
(sub-)sequences of Long values
+ * into the generated events. For instance, the following code will produce 
the sequence of
+ * ["Number: 0", "Number: 2", ... , "Number: 999"] elements.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ *
+ * DataStreamSource stream =
+ * env.fromSource(source,
+ * WatermarkStrategy.noWatermarks(),
+ * "Generator Source");
+ * }
+ *
+ * The order of elements depends on the parallelism. Each sub-sequence will 
be produced in order.
+ * Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+ * "Number: 0" to "Number: 999".
+ *
+ * Note that this approach also makes it possible to produce deterministic 
watermarks at the
+ * source based on the generated events and a custom {@code WatermarkStrategy}.
+ *
+ * This source has built-in support for rate limiting. The following code 
will produce an
+ * effectively unbounded (Long.MAX_VALUE from practical perspective will never 
be reached) stream of
+ * Long values at the overall source rate (across all source subtasks) of 100 
events per second.
+ *
+ * {@code
+ * GeneratorFunction generatorFunction = index -> index;
+ *
+ * DataGeneratorSource source =
+ * new DataGeneratorSource<>(
+ *  generatorFunctionStateless,
+ *  Long.MAX_VALUE,
+ *  RateLimiterStrategy.perSecond(100),
+ *  Types.STRING);
+ * }
+ *
+ * This source is always bounded. For very long sequences (for example when 
the {@code count} is
+ * set to Long.MAX_VALUE), users may want to consider executing the 
application in a st

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-11 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r992197969


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java:
##
@@ -52,128 +40,19 @@
 @Public
 public class IteratorSourceReader<
 E, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
-implements SourceReader {
-
-/** The context for this reader, to communicate with the enumerator. */
-private final SourceReaderContext context;
-
-/** The availability future. This reader is available as soon as a split 
is assigned. */
-private CompletableFuture availability;
-
-/**
- * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
- * non-null always together with the {@link #currentSplit} field.
- */
-@Nullable private IterT iterator;
-
-/**
- * The split whose data we return. Non-null after a split has been 
assigned. This field is null
- * or non-null always together with the {@link #iterator} field.
- */
-@Nullable private SplitT currentSplit;
-
-/** The remaining splits that were assigned but not yet processed. */
-private final Queue remainingSplits;
-
-private boolean noMoreSplits;
+extends IteratorSourceReaderBase {
 
 public IteratorSourceReader(SourceReaderContext context) {
-this.context = checkNotNull(context);
-this.availability = new CompletableFuture<>();
-this.remainingSplits = new ArrayDeque<>();
-}
-
-// 
-
-@Override
-public void start() {
-// request a split if we don't have one
-if (remainingSplits.isEmpty()) {
-context.sendSplitRequest();
-}
-}
-
-@Override
-public InputStatus pollNext(ReaderOutput output) {
-if (iterator != null) {
-if (iterator.hasNext()) {
-output.collect(iterator.next());
-return InputStatus.MORE_AVAILABLE;
-} else {
-finishSplit();
-}
-}
-
-return tryMoveToNextSplit();
-}
-
-private void finishSplit() {
-iterator = null;
-currentSplit = null;
-
-// request another split if no other is left
-// we do this only here in the finishSplit part to avoid requesting a 
split
-// whenever the reader is polled and doesn't currently have a split
-if (remainingSplits.isEmpty() && !noMoreSplits) {
-context.sendSplitRequest();
-}
-}
-
-private InputStatus tryMoveToNextSplit() {
-currentSplit = remainingSplits.poll();
-if (currentSplit != null) {
-iterator = currentSplit.getIterator();
-return InputStatus.MORE_AVAILABLE;
-} else if (noMoreSplits) {
-return InputStatus.END_OF_INPUT;
-} else {
-// ensure we are not called in a loop by resetting the 
availability future
-if (availability.isDone()) {
-availability = new CompletableFuture<>();
-}
-
-return InputStatus.NOTHING_AVAILABLE;
-}
+super(context);
 }
 
 @Override
-public CompletableFuture isAvailable() {
-return availability;
-}
+protected void start(SourceReaderContext context) {}
 
-@Override
-public void addSplits(List splits) {
-remainingSplits.addAll(splits);
-// set availability so that pollNext is actually called
-availability.complete(null);
-}
-
-@Override
-public void notifyNoMoreSplits() {
-noMoreSplits = true;
-// set availability so that pollNext is actually called
-availability.complete(null);
-}
+// 
 
 @Override
-public List snapshotState(long checkpointId) {
-if (currentSplit == null && remainingSplits.isEmpty()) {
-return Collections.emptyList();
-}
-
-final ArrayList allSplits = new ArrayList<>(1 + 
remainingSplits.size());
-if (iterator != null && iterator.hasNext()) {
-assert currentSplit != null;
-
-@SuppressWarnings("unchecked")
-final SplitT inProgressSplit =
-(SplitT) currentSplit.getUpdatedSplitForIterator(iterator);
-allSplits.add(inProgressSplit);
-}
-allSplits.addAll(remainingSplits);
-return allSplits;
+protected E convert(E value) {

Review Comment:
   I was confused; I though the IteratorSourceReader was the parent class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this ser

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-11 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r992198837


##
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java:
##
@@ -52,128 +40,19 @@
 @Public
 public class IteratorSourceReader<
 E, IterT extends Iterator, SplitT extends 
IteratorSourceSplit>
-implements SourceReader {
-
-/** The context for this reader, to communicate with the enumerator. */
-private final SourceReaderContext context;
-
-/** The availability future. This reader is available as soon as a split 
is assigned. */
-private CompletableFuture availability;
-
-/**
- * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
- * non-null always together with the {@link #currentSplit} field.
- */
-@Nullable private IterT iterator;
-
-/**
- * The split whose data we return. Non-null after a split has been 
assigned. This field is null
- * or non-null always together with the {@link #iterator} field.
- */
-@Nullable private SplitT currentSplit;
-
-/** The remaining splits that were assigned but not yet processed. */
-private final Queue remainingSplits;
-
-private boolean noMoreSplits;
+extends IteratorSourceReaderBase {
 
 public IteratorSourceReader(SourceReaderContext context) {
-this.context = checkNotNull(context);
-this.availability = new CompletableFuture<>();
-this.remainingSplits = new ArrayDeque<>();
-}
-
-// 
-
-@Override
-public void start() {
-// request a split if we don't have one
-if (remainingSplits.isEmpty()) {
-context.sendSplitRequest();
-}
-}
-
-@Override
-public InputStatus pollNext(ReaderOutput output) {
-if (iterator != null) {
-if (iterator.hasNext()) {
-output.collect(iterator.next());
-return InputStatus.MORE_AVAILABLE;
-} else {
-finishSplit();
-}
-}
-
-return tryMoveToNextSplit();
-}
-
-private void finishSplit() {
-iterator = null;
-currentSplit = null;
-
-// request another split if no other is left
-// we do this only here in the finishSplit part to avoid requesting a 
split
-// whenever the reader is polled and doesn't currently have a split
-if (remainingSplits.isEmpty() && !noMoreSplits) {
-context.sendSplitRequest();
-}
-}
-
-private InputStatus tryMoveToNextSplit() {
-currentSplit = remainingSplits.poll();
-if (currentSplit != null) {
-iterator = currentSplit.getIterator();
-return InputStatus.MORE_AVAILABLE;
-} else if (noMoreSplits) {
-return InputStatus.END_OF_INPUT;
-} else {
-// ensure we are not called in a loop by resetting the 
availability future
-if (availability.isDone()) {
-availability = new CompletableFuture<>();
-}
-
-return InputStatus.NOTHING_AVAILABLE;
-}
+super(context);
 }
 
 @Override
-public CompletableFuture isAvailable() {
-return availability;
-}
+protected void start(SourceReaderContext context) {}

Review Comment:
   I was confused; I thought. the IteratorSourceReader was the parent class.
   
   This is fine as is imo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-13 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r994673018


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 

Review Comment:
   ```suggestion
   generation process by supplying "index" values of type `Long` to the 
user-provided 
   ```



##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values

Review Comment:
   ```suggestion
   The `GeneratorFunction` is then used for mapping the (sub-)sequences of 
`Long` values
   ```



##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.

Review Comment:
   ```suggestion
   `["Number: 0", "Number: 1", ... , "Number: 999"]` records.
   ```
   It's not intuitive that this is 2.



##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);

Review Comment:
   Move `1000` into a separate variable for clarity; that should explain why 
the sequence ends at `999`.



##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen C

[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)

2022-10-13 Thread GitBox


zentol commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r994695900


##
docs/content/docs/connectors/datastream/datagen.md:
##
@@ -0,0 +1,115 @@
+---
+title: DataGen
+weight: 3
+type: docs
+---
+
+
+# DataGen Connector
+
+The DataGen connector provides a `Source` implementation that allows for 
generating input data for 
+Flink pipelines.
+It is useful when developing locally or demoing without access to external 
systems such as Kafka.
+The DataGen connector is built-in, no additional dependencies are required.
+
+Usage
+-
+
+The `DataGeneratorSource` produces N data points in parallel. The source 
splits the sequence 
+into as many parallel sub-sequences as there are parallel source subtasks. It 
drives the data 
+generation process by supplying "index" values of type Long to the 
user-provided 
+{{< javadoc name="GeneratorFunction" 
file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}.
+
+The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long 
values
+into the generated events of an arbitrary data type. For instance, the 
following code will produce the sequence of
+`["Number: 0", "Number: 2", ... , "Number: 999"]` records.
+
+```java
+GeneratorFunction generatorFunction = index -> "Number: " + 
index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+
+DataStreamSource stream =
+env.fromSource(source,
+WatermarkStrategy.noWatermarks(),
+"Generator Source");
+```
+
+The order of elements depends on the parallelism. Each sub-sequence will be 
produced in order.
+Consequently, if the parallelism is limited to one, this will produce one 
sequence in order from
+`"Number: 0"` to `"Number: 999"`.
+
+`DataGeneratorSource` has built-in support for rate limiting. The following 
code will produce an
+effectively unbounded (`Long.MAX_VALUE` from a practical perspective will 
never be reached) stream of
+Long values at the overall source rate (across all source subtasks) not 
exceeding 100 events per second.
+
+```java
+GeneratorFunction generatorFunction = index -> index;
+
+DataGeneratorSource source =
+new DataGeneratorSource<>(
+ generatorFunctionStateless,
+ Long.MAX_VALUE,
+ RateLimiterStrategy.perSecond(100),
+ Types.STRING);
+```
+
+The source also allows for producing specific elements between the checkpoint 
boundaries using the 

Review Comment:
   hmm this also implies that not all GeneratorFunctions from the old source 
can be migrated; anything that changes data on checkpoint can't be replicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org