This is an automated email from the ASF dual-hosted git repository.
afedulov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 48359a32 [FLINK-33525] Move ImpulseSource to new Source API (#950)
48359a32 is described below
commit 48359a326f417081bcfad984cbdab578ef9c906d
Author: PB <[email protected]>
AuthorDate: Mon Mar 10 22:57:03 2025 +0530
[FLINK-33525] Move ImpulseSource to new Source API (#950)
---
examples/autoscaling/pom.xml | 7 +++
.../java/autoscaling/LoadSimulationPipeline.java | 69 +++++++++++++---------
2 files changed, 49 insertions(+), 27 deletions(-)
diff --git a/examples/autoscaling/pom.xml b/examples/autoscaling/pom.xml
index 6da06a85..993b22dd 100644
--- a/examples/autoscaling/pom.xml
+++ b/examples/autoscaling/pom.xml
@@ -45,6 +45,13 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-datagen</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
diff --git
a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
index 0ddb2302..14865e1b 100644
--- a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
+++ b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java
@@ -18,12 +18,16 @@
package autoscaling;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
@@ -60,6 +64,11 @@ public class LoadSimulationPipeline {
private static final Logger LOG =
LoggerFactory.getLogger(LoadSimulationPipeline.class);
+ // Number of impulses (records) emitted per sampling interval.
+ // This value determines how many records should be generated within each
`samplingIntervalMs`
+ // period.
+ private static final int IMPULSES_PER_SAMPLING_INTERVAL = 10;
+
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
@@ -74,8 +83,39 @@ public class LoadSimulationPipeline {
for (String branch : maxLoadPerTask.split("\n")) {
String[] taskLoads = branch.split(";");
+ /*
+ * Creates an unbounded stream that continuously emits the
constant value 42L.
+ * Flink's DataGeneratorSource with RateLimiterStrategy is used to
control the emission rate.
+ *
+ * Emission Rate Logic:
+ * - The goal is to generate a fixed number of impulses per
sampling interval.
+ * - `samplingIntervalMs` defines the duration of one sampling
interval in milliseconds.
+ * - We define `IMPULSES_PER_SAMPLING_INTERVAL = 10`, meaning that
for every sampling interval,
+ * exactly 10 impulses should be generated.
+ *
+ * To calculate the total number of records emitted per second:
+ * 1. Determine how many sampling intervals fit within one second:
+ * samplingIntervalsPerSecond = 1000 / samplingIntervalMs;
+ * 2. Multiply this by the number of impulses per interval to get
the total rate:
+ * impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL *
samplingIntervalsPerSecond;
+ *
+ * Example:
+ * - If `samplingIntervalMs = 500 ms` and
`IMPULSES_PER_SAMPLING_INTERVAL = 10`:
+ * impulsesPerSecond = (1000 / 500) * 10 = 2 * 10 = 20
records per second.
+ */
DataStream<Long> stream =
- env.addSource(new
ImpulseSource(samplingIntervalMs)).name("ImpulseSource");
+ env.fromSource(
+ new DataGeneratorSource<>(
+ (GeneratorFunction<Long, Long>)
+ (index) -> 42L, // Emits constant
value 42
+ Long.MAX_VALUE, // Unbounded stream
+ RateLimiterStrategy.perSecond(
+ (1000.0 / samplingIntervalMs)
+ *
IMPULSES_PER_SAMPLING_INTERVAL), // Controls
+ // rate
+ Types.LONG),
+ WatermarkStrategy.noWatermarks(),
+ "ImpulseSource");
for (String load : taskLoads) {
double maxLoad = Double.parseDouble(load);
@@ -97,31 +137,6 @@ public class LoadSimulationPipeline {
+ ")");
}
- private static class ImpulseSource implements SourceFunction<Long> {
- private final int maxSleepTimeMs;
- volatile boolean canceled;
-
- public ImpulseSource(int samplingInterval) {
- this.maxSleepTimeMs = samplingInterval / 10;
- }
-
- @Override
- public void run(SourceContext<Long> sourceContext) throws Exception {
- while (!canceled) {
- synchronized (sourceContext.getCheckpointLock()) {
- sourceContext.collect(42L);
- }
- // Provide an impulse to keep the load simulation active
- Thread.sleep(maxSleepTimeMs);
- }
- }
-
- @Override
- public void cancel() {
- canceled = true;
- }
- }
-
private static class LoadSimulationFn extends RichFlatMapFunction<Long,
Long> {
private final double maxLoad;