This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-training.git
The following commit(s) were added to refs/heads/master by this push: new bfcd25a [FLINK-24118] Allow TaxiFareGenerator to produce bounded streams (#40) bfcd25a is described below commit bfcd25a9d52e71f018720ea4865090b5bab5b135 Author: David Anderson <da...@alpinegizmo.com> AuthorDate: Fri Sep 3 08:36:41 2021 -0600 [FLINK-24118] Allow TaxiFareGenerator to produce bounded streams (#40) --- .../exercises/common/sources/TaxiFareGenerator.java | 19 ++++++++++++++++++- .../exercises/common/utils/DataGenerator.java | 4 ++-- .../training/exercises/hourlytips/HourlyTipsTest.java | 8 ++++---- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java index 0866e0d..58fbe68 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java @@ -20,6 +20,10 @@ package org.apache.flink.training.exercises.common.sources; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.training.exercises.common.datatypes.TaxiFare; +import org.apache.flink.training.exercises.common.utils.DataGenerator; + +import java.time.Duration; +import java.time.Instant; /** * This SourceFunction generates a data stream of TaxiFare records. @@ -29,6 +33,14 @@ import org.apache.flink.training.exercises.common.datatypes.TaxiFare; public class TaxiFareGenerator implements SourceFunction<TaxiFare> { private volatile boolean running = true; + private Instant limitingTimestamp = Instant.MAX; + + /** Create a bounded TaxiFareGenerator that runs only for the specified duration. */ + public static TaxiFareGenerator runFor(Duration duration) { + TaxiFareGenerator generator = new TaxiFareGenerator(); + generator.limitingTimestamp = DataGenerator.BEGINNING.plus(duration); + return generator; + } @Override public void run(SourceContext<TaxiFare> ctx) throws Exception { @@ -37,8 +49,13 @@ public class TaxiFareGenerator implements SourceFunction<TaxiFare> { while (running) { TaxiFare fare = new TaxiFare(id); - id += 1; + // don't emit events that exceed the specified limit + if (fare.startTime.compareTo(limitingTimestamp) >= 0) { + break; + } + + ++id; ctx.collect(fare); // match our event production rate to that of the TaxiRideGenerator diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java index 10c4c2b..078c43a 100644 --- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java +++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java @@ -32,7 +32,7 @@ public class DataGenerator { private static final int SECONDS_BETWEEN_RIDES = 20; private static final int NUMBER_OF_DRIVERS = 200; - private static final Instant beginTime = Instant.parse("2020-01-01T12:00:00.00Z"); + public static final Instant BEGINNING = Instant.parse("2020-01-01T12:00:00.00Z"); private transient long rideId; @@ -43,7 +43,7 @@ public class DataGenerator { /** Deterministically generates and returns the startTime for this ride. */ public Instant startTime() { - return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId); + return BEGINNING.plusSeconds(SECONDS_BETWEEN_RIDES * rideId); } /** Deterministically generates and returns the endTime for this ride. */ diff --git a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java index 8167eba..1e379c3 100644 --- a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java +++ b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.training.exercises.common.datatypes.TaxiFare; +import org.apache.flink.training.exercises.common.utils.DataGenerator; import org.apache.flink.training.exercises.testing.ComposedPipeline; import org.apache.flink.training.exercises.testing.ExecutablePipeline; import org.apache.flink.training.exercises.testing.ParallelTestSource; @@ -33,6 +34,7 @@ import org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution; import org.junit.ClassRule; import org.junit.Test; +import java.time.Duration; import java.time.Instant; import java.util.List; @@ -105,10 +107,8 @@ public class HourlyTipsTest { assertThat(results(source)).containsExactlyInAnyOrder(hour1, hour2); } - private static final Instant BEGINNING = Instant.parse("2020-01-01T12:00:00.00Z"); - - private Instant t(int minutes) { - return BEGINNING.plusSeconds(60L * minutes); + public Instant t(int minutes) { + return DataGenerator.BEGINNING.plus(Duration.ofMinutes(minutes)); } private TaxiFare testFare(long driverId, Instant startTime, float tip) {