This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
     new bfcd25a  [FLINK-24118] Allow TaxiFareGenerator to produce bounded 
streams (#40)
bfcd25a is described below

commit bfcd25a9d52e71f018720ea4865090b5bab5b135
Author: David Anderson <da...@alpinegizmo.com>
AuthorDate: Fri Sep 3 08:36:41 2021 -0600

    [FLINK-24118] Allow TaxiFareGenerator to produce bounded streams (#40)
---
 .../exercises/common/sources/TaxiFareGenerator.java   | 19 ++++++++++++++++++-
 .../exercises/common/utils/DataGenerator.java         |  4 ++--
 .../training/exercises/hourlytips/HourlyTipsTest.java |  8 ++++----
 3 files changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
index 0866e0d..58fbe68 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
@@ -20,6 +20,10 @@ package org.apache.flink.training.exercises.common.sources;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.utils.DataGenerator;
+
+import java.time.Duration;
+import java.time.Instant;
 
 /**
  * This SourceFunction generates a data stream of TaxiFare records.
@@ -29,6 +33,14 @@ import 
org.apache.flink.training.exercises.common.datatypes.TaxiFare;
 public class TaxiFareGenerator implements SourceFunction<TaxiFare> {
 
     private volatile boolean running = true;
+    private Instant limitingTimestamp = Instant.MAX;
+
+    /** Create a bounded TaxiFareGenerator that runs only for the specified 
duration. */
+    public static TaxiFareGenerator runFor(Duration duration) {
+        TaxiFareGenerator generator = new TaxiFareGenerator();
+        generator.limitingTimestamp = DataGenerator.BEGINNING.plus(duration);
+        return generator;
+    }
 
     @Override
     public void run(SourceContext<TaxiFare> ctx) throws Exception {
@@ -37,8 +49,13 @@ public class TaxiFareGenerator implements 
SourceFunction<TaxiFare> {
 
         while (running) {
             TaxiFare fare = new TaxiFare(id);
-            id += 1;
 
+            // don't emit events that exceed the specified limit
+            if (fare.startTime.compareTo(limitingTimestamp) >= 0) {
+                break;
+            }
+
+            ++id;
             ctx.collect(fare);
 
             // match our event production rate to that of the TaxiRideGenerator
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
index 10c4c2b..078c43a 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
@@ -32,7 +32,7 @@ public class DataGenerator {
 
     private static final int SECONDS_BETWEEN_RIDES = 20;
     private static final int NUMBER_OF_DRIVERS = 200;
-    private static final Instant beginTime = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
 
     private transient long rideId;
 
@@ -43,7 +43,7 @@ public class DataGenerator {
 
     /** Deterministically generates and returns the startTime for this ride. */
     public Instant startTime() {
-        return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId);
+        return BEGINNING.plusSeconds(SECONDS_BETWEEN_RIDES * rideId);
     }
 
     /** Deterministically generates and returns the endTime for this ride. */
diff --git 
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
 
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index 8167eba..1e379c3 100644
--- 
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++ 
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+import org.apache.flink.training.exercises.common.utils.DataGenerator;
 import org.apache.flink.training.exercises.testing.ComposedPipeline;
 import org.apache.flink.training.exercises.testing.ExecutablePipeline;
 import org.apache.flink.training.exercises.testing.ParallelTestSource;
@@ -33,6 +34,7 @@ import 
org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
 
@@ -105,10 +107,8 @@ public class HourlyTipsTest {
         assertThat(results(source)).containsExactlyInAnyOrder(hour1, hour2);
     }
 
-    private static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
-
-    private Instant t(int minutes) {
-        return BEGINNING.plusSeconds(60L * minutes);
+    public Instant t(int minutes) {
+        return DataGenerator.BEGINNING.plus(Duration.ofMinutes(minutes));
     }
 
     private TaxiFare testFare(long driverId, Instant startTime, float tip) {

Reply via email to