NicoK commented on a change in pull request #31:
URL: https://github.com/apache/flink-training/pull/31#discussion_r695473786



##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
##########
@@ -0,0 +1,77 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A KeyedProcessFunction that can delegate to either a KeyedProcessFunction 
in the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <K> key type
+ * @param <IN> input type
+ * @param <OUT> output type
+ */
+public class ComposedKeyedProcessFunction<K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+    private KeyedProcessFunction<K, IN, OUT> exercise;
+    private KeyedProcessFunction<K, IN, OUT> solution;
+    private boolean useExercise;
+
+    public ComposedKeyedProcessFunction(
+            KeyedProcessFunction<K, IN, OUT> exercise, KeyedProcessFunction<K, 
IN, OUT> solution) {

Review comment:
       Not sure how heavy those `solution` objects may be, but we could also 
change this parameter to a `solutionProvider` lambda function which only 
instantiates the solution if the exercise fails to initialize and otherwise 
doesn't do anything.

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
##########
@@ -0,0 +1,77 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A KeyedProcessFunction that can delegate to either a KeyedProcessFunction 
in the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <K> key type
+ * @param <IN> input type
+ * @param <OUT> output type
+ */
+public class ComposedKeyedProcessFunction<K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+    private KeyedProcessFunction<K, IN, OUT> exercise;
+    private KeyedProcessFunction<K, IN, OUT> solution;

Review comment:
       ```suggestion
       private final KeyedProcessFunction<K, IN, OUT> exercise;
       private final KeyedProcessFunction<K, IN, OUT> solution;
   ```

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedPipeline.java
##########
@@ -0,0 +1,37 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+
+/**
+ * This allows the tests to be run against both the exercises and the 
solutions.
+ *
+ * <p>If an exercise throws MissingSolutionException, then the solution is 
tested.
+ */
+public class ComposedPipeline<IN, OUT> implements ExecutablePipeline<IN, OUT> {
+
+    private ExecutablePipeline<IN, OUT> exercise;
+    private ExecutablePipeline<IN, OUT> solution;

Review comment:
       ```suggestion
       private final ExecutablePipeline<IN, OUT> exercise;
       private final ExecutablePipeline<IN, OUT> solution;
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesIntegrationTest extends LongRidesTestBase {
+
+    private static final int PARALLELISM = 2;
+
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    @Test
+    public void shortRide() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(rideStarted, endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
+    }
+
+    @Test
+    public void shortRideOutOfOrder() throws Exception {
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
+    }
+
+    @Test
+    public void multipleRides() throws Exception {
+        TaxiRide longRideWithoutEnd = startRide(1, BEGINNING);
+        TaxiRide twoHourRide = startRide(2, BEGINNING);
+        TaxiRide otherLongRide = startRide(3, ONE_MINUTE_LATER);
+        TaxiRide shortRide = startRide(4, ONE_HOUR_LATER);
+        TaxiRide shortRideEnded = endRide(shortRide, TWO_HOURS_LATER);
+        TaxiRide twoHourRideEnded = endRide(twoHourRide, BEGINNING);
+        TaxiRide otherLongRideEnded = endRide(otherLongRide, 
THREE_HOURS_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(
+                        longRideWithoutEnd,
+                        twoHourRide,
+                        otherLongRide,
+                        shortRide,
+                        shortRideEnded,
+                        twoHourRideEnded,
+                        otherLongRideEnded);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results())
+                .containsExactlyInAnyOrder(longRideWithoutEnd.rideId, 
otherLongRide.rideId);
+    }
+
+    protected ComposedPipeline longRidesPipeline() {
+        ExecutablePipeline<TaxiRide, Long> exercise =
+                (source, sink) -> (new LongRidesExercise(source, 
sink)).execute();
+        ExecutablePipeline<TaxiRide, Long> solution =
+                (source, sink) -> (new LongRidesSolution(source, 
sink)).execute();
+
+        return new ComposedPipeline(exercise, solution);
+    }

Review comment:
       ```suggestion
       protected ComposedPipeline longRidesPipeline() {
           ExecutablePipeline<TaxiRide, Long> exercise =
                   (source, sink) -> (new LongRidesExercise(source, 
sink)).execute();
           ExecutablePipeline<TaxiRide, Long> solution =
                   (source, sink) -> (new LongRidesSolution(source, 
sink)).execute();
   
           return new ComposedPipeline<>(exercise, solution);
       }
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesIntegrationTest extends LongRidesTestBase {
+
+    private static final int PARALLELISM = 2;
+
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    @Test
+    public void shortRide() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(rideStarted, endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
+    }
+
+    @Test
+    public void shortRideOutOfOrder() throws Exception {
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();

Review comment:
       ```suggestion
           TestSink<Long> sink = new TestSink<>();
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java
##########
@@ -0,0 +1,37 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+
+import java.time.Instant;
+
+public class LongRidesTestBase {
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant ONE_HOUR_LATER = BEGINNING.plusSeconds(60 * 
60);
+    public static final Instant TWO_HOURS_LATER = BEGINNING.plusSeconds(120 * 
60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
+
+    public TaxiRide startRide(long rideId, Instant startTime) {

Review comment:
       ```suggestion
       public static TaxiRide startRide(long rideId, Instant startTime) {
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesIntegrationTest extends LongRidesTestBase {
+
+    private static final int PARALLELISM = 2;
+
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    @Test
+    public void shortRide() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(rideStarted, endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
+    }
+
+    @Test
+    public void shortRideOutOfOrder() throws Exception {
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(endedOneMinLater, rideStarted);
+        TestSink<Long> sink = new TestSink<Long>();
+
+        longRidesPipeline().execute(source, sink);
+        assertThat(sink.results()).isEmpty();
+    }
+
+    @Test
+    public void multipleRides() throws Exception {
+        TaxiRide longRideWithoutEnd = startRide(1, BEGINNING);
+        TaxiRide twoHourRide = startRide(2, BEGINNING);
+        TaxiRide otherLongRide = startRide(3, ONE_MINUTE_LATER);
+        TaxiRide shortRide = startRide(4, ONE_HOUR_LATER);
+        TaxiRide shortRideEnded = endRide(shortRide, TWO_HOURS_LATER);
+        TaxiRide twoHourRideEnded = endRide(twoHourRide, BEGINNING);
+        TaxiRide otherLongRideEnded = endRide(otherLongRide, 
THREE_HOURS_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(
+                        longRideWithoutEnd,
+                        twoHourRide,
+                        otherLongRide,
+                        shortRide,
+                        shortRideEnded,
+                        twoHourRideEnded,
+                        otherLongRideEnded);
+        TestSink<Long> sink = new TestSink<Long>();

Review comment:
       ```suggestion
           TestSink<Long> sink = new TestSink<>();
   ```

##########
File path: 
common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
##########
@@ -181,4 +183,16 @@ public double getEuclideanDistance(double longitude, 
double latitude) {
                     (float) longitude, (float) latitude, this.endLon, 
this.endLat);
         }
     }
+
+    /** Creates a StreamRecord, using the ride and its timestamp. Used in 
tests. */
+    @VisibleForTesting
+    public StreamRecord asStreamRecord() {
+        return new StreamRecord(this, this.getEventTime());
+    }

Review comment:
       ```suggestion
       public StreamRecord<TaxiRide> asStreamRecord() {
           return new StreamRecord<>(this, this.getEventTime());
       }
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java
##########
@@ -0,0 +1,37 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+
+import java.time.Instant;
+
+public class LongRidesTestBase {
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant ONE_HOUR_LATER = BEGINNING.plusSeconds(60 * 
60);
+    public static final Instant TWO_HOURS_LATER = BEGINNING.plusSeconds(120 * 
60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
+
+    public TaxiRide startRide(long rideId, Instant startTime) {
+        return testRide(rideId, true, startTime, Instant.EPOCH);
+    }
+
+    public TaxiRide endRide(TaxiRide started, Instant endTime) {
+        return testRide(started.rideId, false, started.startTime, endTime);
+    }
+
+    private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, 
Instant endTime) {

Review comment:
       ```suggestion
       private static TaxiRide testRide(
               long rideId, Boolean isStart, Instant startTime, Instant 
endTime) {
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTestBase.java
##########
@@ -0,0 +1,37 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+
+import java.time.Instant;
+
+public class LongRidesTestBase {
+    public static final Instant BEGINNING = 
Instant.parse("2020-01-01T12:00:00.00Z");
+    public static final Instant ONE_MINUTE_LATER = BEGINNING.plusSeconds(60);
+    public static final Instant ONE_HOUR_LATER = BEGINNING.plusSeconds(60 * 
60);
+    public static final Instant TWO_HOURS_LATER = BEGINNING.plusSeconds(120 * 
60);
+    public static final Instant THREE_HOURS_LATER = BEGINNING.plusSeconds(180 
* 60);
+
+    public TaxiRide startRide(long rideId, Instant startTime) {
+        return testRide(rideId, true, startTime, Instant.EPOCH);
+    }
+
+    public TaxiRide endRide(TaxiRide started, Instant endTime) {

Review comment:
       ```suggestion
       public static TaxiRide endRide(TaxiRide started, Instant endTime) {
   ```

##########
File path: 
common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
##########
@@ -181,4 +183,16 @@ public double getEuclideanDistance(double longitude, 
double latitude) {
                     (float) longitude, (float) latitude, this.endLon, 
this.endLat);
         }
     }
+
+    /** Creates a StreamRecord, using the ride and its timestamp. Used in 
tests. */
+    @VisibleForTesting
+    public StreamRecord asStreamRecord() {
+        return new StreamRecord(this, this.getEventTime());
+    }
+
+    /** Creates a StreamRecord from this taxi ride, using its id and 
timestamp. Used in tests. */
+    @VisibleForTesting
+    public StreamRecord idAsStreamRecord() {
+        return new StreamRecord(this.rideId, this.getEventTime());
+    }

Review comment:
       ```suggestion
       public StreamRecord<Long> idAsStreamRecord() {
           return new StreamRecord<>(this.rideId, this.getEventTime());
       }
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesIntegrationTest extends LongRidesTestBase {
+
+    private static final int PARALLELISM = 2;
+
+    /** This isn't necessary, but speeds up the tests. */
+    @ClassRule
+    public static MiniClusterWithClientResource flinkCluster =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    @Test
+    public void shortRide() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide endedOneMinLater = endRide(rideStarted, ONE_MINUTE_LATER);
+
+        ParallelTestSource<TaxiRide> source =
+                new ParallelTestSource<>(rideStarted, endedOneMinLater);
+        TestSink<Long> sink = new TestSink<Long>();

Review comment:
       ```suggestion
           TestSink<Long> sink = new TestSink<>();
   ```

##########
File path: 
long-ride-alerts/src/test/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesUnitTest.scala
##########
@@ -18,19 +18,21 @@
 
 package org.apache.flink.training.exercises.longrides.scala
 
-import java.util
-
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
 import org.apache.flink.training.exercises.longrides
-import org.apache.flink.training.exercises.testing.TaxiRideTestBase
-import org.apache.flink.training.solutions.longrides.scala.LongRidesSolution
+import org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction

Review comment:
       ```suggestion
   import 
org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction
   import org.apache.flink.training.solutions.longrides.scala.LongRidesSolution
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -0,0 +1,119 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import 
org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesUnitTest extends LongRidesTestBase {

Review comment:
       ```suggestion
   // needed for the Scala tests to use scala.Long with this Java test
   @SuppressWarnings({"rawtypes", "unchecked"})
   public class LongRidesUnitTest extends LongRidesTestBase {
   ```

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
##########
@@ -0,0 +1,77 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A KeyedProcessFunction that can delegate to either a KeyedProcessFunction 
in the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <K> key type
+ * @param <IN> input type
+ * @param <OUT> output type
+ */
+public class ComposedKeyedProcessFunction<K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+    private KeyedProcessFunction<K, IN, OUT> exercise;
+    private KeyedProcessFunction<K, IN, OUT> solution;
+    private boolean useExercise;
+
+    public ComposedKeyedProcessFunction(
+            KeyedProcessFunction<K, IN, OUT> exercise, KeyedProcessFunction<K, 
IN, OUT> solution) {
+
+        this.exercise = exercise;
+        this.solution = solution;
+        this.useExercise = true;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        try {
+            exercise.setRuntimeContext(this.getRuntimeContext());
+            exercise.open(parameters);
+        } catch (Exception e) {
+            if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
+                this.useExercise = false;
+                solution.setRuntimeContext(this.getRuntimeContext());
+                solution.open(parameters);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void processElement(
+            IN value,
+            org.apache.flink.streaming.api.functions.KeyedProcessFunction<K, 
IN, OUT>.Context ctx,
+            Collector<OUT> out)

Review comment:
       ```suggestion
               IN value, KeyedProcessFunction<K, IN, OUT>.Context ctx, 
Collector<OUT> out)
   ```

##########
File path: 
long-ride-alerts/src/test/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesIntegrationTest.scala
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides.scala
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide
+import org.apache.flink.training.exercises.longrides
+import org.apache.flink.training.exercises.testing.{ComposedPipeline, 
ExecutablePipeline, TestSink}
+
+/**
+  * The Scala tests extend the Java tests by overriding the 
longRidesPipeline() method
+  * to use the Scala implementations of the exercise and solution.
+  */
+class LongRidesIntegrationTest extends longrides.LongRidesIntegrationTest {
+  private val EXERCISE: ExecutablePipeline[TaxiRide, Long] =
+    (source: SourceFunction[TaxiRide], sink: TestSink[Long]) =>
+      (new LongRidesExercise.LongRidesJob(source, sink)).execute()
+
+  private val SOLUTION: ExecutablePipeline[TaxiRide, Long] =
+    (source: SourceFunction[TaxiRide], sink: TestSink[Long]) =>
+      (new LongRidesSolution.LongRidesJob(source, sink)).execute()

Review comment:
       ```suggestion
         new LongRidesSolution.LongRidesJob(source, sink).execute()
   ```

##########
File path: 
long-ride-alerts/src/test/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesIntegrationTest.scala
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides.scala
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide
+import org.apache.flink.training.exercises.longrides
+import org.apache.flink.training.exercises.testing.{ComposedPipeline, 
ExecutablePipeline, TestSink}
+
+/**
+  * The Scala tests extend the Java tests by overriding the 
longRidesPipeline() method
+  * to use the Scala implementations of the exercise and solution.
+  */
+class LongRidesIntegrationTest extends longrides.LongRidesIntegrationTest {
+  private val EXERCISE: ExecutablePipeline[TaxiRide, Long] =
+    (source: SourceFunction[TaxiRide], sink: TestSink[Long]) =>
+      (new LongRidesExercise.LongRidesJob(source, sink)).execute()

Review comment:
       ```suggestion
         new LongRidesExercise.LongRidesJob(source, sink).execute()
   ```

##########
File path: 
long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
##########
@@ -18,56 +18,82 @@
 
 package org.apache.flink.training.exercises.longrides.scala
 
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase, 
MissingSolutionException}
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 import org.apache.flink.util.Collector
 
+import java.time.Duration
+
 /**
-  * The "Long Ride Alerts" exercise of the Flink training in the docs.
+  * The "Long Ride Alerts" exercise.
   *
-  * The goal for this exercise is to emit START events for taxi rides that 
have not been matched
-  * by an END event during the first 2 hours of the ride.
+  * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+  * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
   *
+  * <p>You should eventually clear any state you create.
   */
 object LongRidesExercise {
 
-  def main(args: Array[String]) {
+  class LongRidesJob(source: SourceFunction[TaxiRide], sink: 
SinkFunction[Long]) {
+
+    /**
+      * Creates and executes the ride cleansing pipeline.
+      */
+    @throws[Exception]
+    def execute(): Unit = {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+      // start the data generator
+      val rides = env.addSource(source)
+
+      // the WatermarkStrategy specifies how to extract timestamps and 
generate watermarks
+      val watermarkStrategy = WatermarkStrategy
+        .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
+        .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
+          override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: 
Long): Long =
+            ride.getEventTime
+        })
 
-    // set up the execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(ExerciseBase.parallelism)
+      // create the pipeline
+      rides
+        .assignTimestampsAndWatermarks(watermarkStrategy)
+        .keyBy(_.rideId)
+        .process(new AlertFunction())
+        .addSink(sink)
 
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+      // execute the pipeline
+      env.execute("Long Taxi Rides")
+    }
 
-    val longRides = rides
-      .keyBy(_.rideId)
-      .process(new ImplementMeFunction())
+  }
 
-    printOrTest(longRides)
+  @throws[Exception]
+  def main(args: Array[String]) {
+    val job = new LongRidesJob(new TaxiRideGenerator, new PrintSinkFunction)
 
-    env.execute("Long Taxi Rides")
+    job.execute

Review comment:
       IntelliJ suggests to add parameters if it has side effects (which it 
does)
   ```suggestion
       job.execute()
   ```

##########
File path: 
long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
##########
@@ -18,48 +18,86 @@
 
 package org.apache.flink.training.solutions.longrides;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
- * Solution to the "Long Ride Alerts" exercise of the Flink training in the 
docs.
+ * Java solution for the "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
  *
- * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
  */
-public class LongRidesSolution extends ExerciseBase {
+public class LongRidesSolution {
+
+    private SourceFunction<TaxiRide> source;
+    private SinkFunction<Long> sink;

Review comment:
       ```suggestion
       private final SourceFunction<TaxiRide> source;
       private final SinkFunction<Long> sink;
   ```

##########
File path: 
long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
##########
@@ -18,81 +18,123 @@
 
 package org.apache.flink.training.solutions.longrides.scala
 
-import scala.concurrent.duration._
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.util.Collector
 
+import java.time.Duration
+import scala.concurrent.duration._
+
 /**
-  * Scala reference implementation for the "Long Ride Alerts" exercise of the 
Flink training in the docs.
+  * Scala solution for the "Long Ride Alerts" exercise.
   *
-  * The goal for this exercise is to emit START events for taxi rides that 
have not been matched
-  * by an END event during the first 2 hours of the ride.
+  * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+  * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
   *
+  * <p>You should eventually clear any state you create.
   */
 object LongRidesSolution {
 
-  def main(args: Array[String]) {
-
-    // set up the execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    // operate in Event-time
-    env.setParallelism(ExerciseBase.parallelism)
-
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+  class LongRidesJob(source: SourceFunction[TaxiRide], sink: 
SinkFunction[Long]) {
+
+    /**
+      * Creates and executes the ride cleansing pipeline.
+      */
+    @throws[Exception]
+    def execute(): Unit = {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+      // start the data generator
+      val rides = env.addSource(source)
+
+      // the WatermarkStrategy specifies how to extract timestamps and 
generate watermarks
+      val watermarkStrategy = WatermarkStrategy
+        .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
+        .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
+          override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: 
Long): Long =
+            ride.getEventTime
+        })
+
+      // create the pipeline
+      rides
+        .assignTimestampsAndWatermarks(watermarkStrategy)
+        .keyBy(_.rideId)
+        .process(new AlertFunction())
+        .addSink(sink)
+
+      // execute the pipeline
+      env.execute("Long Taxi Rides")
+    }
 
-    val longRides = rides
-      .keyBy(_.rideId)
-      .process(new MatchFunction())
+  }
 
-    printOrTest(longRides)
+  @throws[Exception]
+  def main(args: Array[String]) {
+    val job = new LongRidesJob(new TaxiRideGenerator, new PrintSinkFunction)
 
-    env.execute("Long Taxi Rides")
+    job.execute

Review comment:
       ```suggestion
       job.execute()
   ```

##########
File path: 
long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
##########
@@ -18,81 +18,123 @@
 
 package org.apache.flink.training.solutions.longrides.scala
 
-import scala.concurrent.duration._
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.util.Collector
 
+import java.time.Duration
+import scala.concurrent.duration._
+
 /**
-  * Scala reference implementation for the "Long Ride Alerts" exercise of the 
Flink training in the docs.
+  * Scala solution for the "Long Ride Alerts" exercise.
   *
-  * The goal for this exercise is to emit START events for taxi rides that 
have not been matched
-  * by an END event during the first 2 hours of the ride.
+  * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+  * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
   *
+  * <p>You should eventually clear any state you create.
   */
 object LongRidesSolution {
 
-  def main(args: Array[String]) {
-
-    // set up the execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    // operate in Event-time
-    env.setParallelism(ExerciseBase.parallelism)
-
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+  class LongRidesJob(source: SourceFunction[TaxiRide], sink: 
SinkFunction[Long]) {
+
+    /**
+      * Creates and executes the ride cleansing pipeline.
+      */
+    @throws[Exception]
+    def execute(): Unit = {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+      // start the data generator
+      val rides = env.addSource(source)
+
+      // the WatermarkStrategy specifies how to extract timestamps and 
generate watermarks
+      val watermarkStrategy = WatermarkStrategy
+        .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
+        .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
+          override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: 
Long): Long =
+            ride.getEventTime
+        })
+
+      // create the pipeline
+      rides
+        .assignTimestampsAndWatermarks(watermarkStrategy)
+        .keyBy(_.rideId)
+        .process(new AlertFunction())
+        .addSink(sink)
+
+      // execute the pipeline
+      env.execute("Long Taxi Rides")
+    }
 
-    val longRides = rides
-      .keyBy(_.rideId)
-      .process(new MatchFunction())
+  }
 
-    printOrTest(longRides)
+  @throws[Exception]
+  def main(args: Array[String]) {

Review comment:
       ```suggestion
     def main(args: Array[String]): Unit = {
   ```

##########
File path: 
long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
##########
@@ -18,62 +18,95 @@
 
 package org.apache.flink.training.exercises.longrides;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
- * The "Long Ride Alerts" exercise of the Flink training in the docs.
+ * The "Long Ride Alerts" exercise.
+ *
+ * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+ * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
  *
- * <p>The goal for this exercise is to emit START events for taxi rides that 
have not been matched
- * by an END event during the first 2 hours of the ride.
+ * <p>You should eventually clear any state you create.
  */
-public class LongRidesExercise extends ExerciseBase {
+public class LongRidesExercise {
+    private SourceFunction<TaxiRide> source;
+    private SinkFunction<Long> sink;

Review comment:
       ```suggestion
       private final SourceFunction<TaxiRide> source;
       private final SinkFunction<Long> sink;
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -0,0 +1,119 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import 
org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesUnitTest extends LongRidesTestBase {
+
+    private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
harness;
+
+    private KeyedProcessFunction<Long, TaxiRide, Long> javaExercise =
+            new LongRidesExercise.AlertFunction();
+
+    private KeyedProcessFunction<Long, TaxiRide, Long> javaSolution =
+            new LongRidesSolution.AlertFunction();
+
+    protected ComposedKeyedProcessFunction composedAlertFunction() {
+        return new ComposedKeyedProcessFunction(javaExercise, javaSolution);
+    }

Review comment:
       FYI: Scala tests prevent us from using a proper generic return type here
   ```suggestion
       protected ComposedKeyedProcessFunction composedAlertFunction() {
           return new ComposedKeyedProcessFunction<>(javaExercise, 
javaSolution);
       }
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -0,0 +1,119 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import 
org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesUnitTest extends LongRidesTestBase {
+
+    private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
harness;
+
+    private KeyedProcessFunction<Long, TaxiRide, Long> javaExercise =

Review comment:
       ```suggestion
       private final KeyedProcessFunction<Long, TaxiRide, Long> javaExercise =
   ```

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
##########
@@ -0,0 +1,77 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A KeyedProcessFunction that can delegate to either a KeyedProcessFunction 
in the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <K> key type
+ * @param <IN> input type
+ * @param <OUT> output type
+ */
+public class ComposedKeyedProcessFunction<K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+    private KeyedProcessFunction<K, IN, OUT> exercise;
+    private KeyedProcessFunction<K, IN, OUT> solution;
+    private boolean useExercise;
+
+    public ComposedKeyedProcessFunction(
+            KeyedProcessFunction<K, IN, OUT> exercise, KeyedProcessFunction<K, 
IN, OUT> solution) {
+
+        this.exercise = exercise;
+        this.solution = solution;
+        this.useExercise = true;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        try {
+            exercise.setRuntimeContext(this.getRuntimeContext());
+            exercise.open(parameters);
+        } catch (Exception e) {
+            if (MissingSolutionException.ultimateCauseIsMissingSolution(e)) {
+                this.useExercise = false;
+                solution.setRuntimeContext(this.getRuntimeContext());
+                solution.open(parameters);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void processElement(
+            IN value,
+            org.apache.flink.streaming.api.functions.KeyedProcessFunction<K, 
IN, OUT>.Context ctx,
+            Collector<OUT> out)
+            throws Exception {
+
+        if (useExercise) {
+            exercise.processElement(value, ctx, out);
+        } else {
+            solution.processElement(value, ctx, out);
+        }
+    }
+
+    @Override
+    public void onTimer(
+            long timestamp,
+            org.apache.flink.streaming.api.functions.KeyedProcessFunction<K, 
IN, OUT>.OnTimerContext
+                    ctx,
+            Collector<OUT> out)

Review comment:
       ```suggestion
               long timestamp, KeyedProcessFunction<K, IN, OUT>.OnTimerContext 
ctx, Collector<OUT> out)
   ```

##########
File path: 
long-ride-alerts/src/test/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesIntegrationTest.scala
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides.scala
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide
+import org.apache.flink.training.exercises.longrides
+import org.apache.flink.training.exercises.testing.{ComposedPipeline, 
ExecutablePipeline, TestSink}

Review comment:
       without this import, the test doesn't compile
   ```suggestion
   import org.apache.flink.training.exercises.testing.{ComposedPipeline, 
ExecutablePipeline, TestSink}
   import org.apache.flink.training.solutions.longrides.scala.LongRidesSolution
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesIntegrationTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import org.apache.flink.training.exercises.testing.ComposedPipeline;
+import org.apache.flink.training.exercises.testing.ExecutablePipeline;
+import org.apache.flink.training.exercises.testing.ParallelTestSource;
+import org.apache.flink.training.exercises.testing.TestSink;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesIntegrationTest extends LongRidesTestBase {

Review comment:
       ```suggestion
   // needed for the Scala tests to use scala.Long with this Java test
   @SuppressWarnings({"rawtypes", "unchecked"})
   public class LongRidesIntegrationTest extends LongRidesTestBase {
   ```

##########
File path: 
long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
##########
@@ -18,56 +18,82 @@
 
 package org.apache.flink.training.exercises.longrides.scala
 
+import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, 
WatermarkStrategy}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.functions.sink.{PrintSinkFunction, 
SinkFunction}
+import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
 import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
-import org.apache.flink.training.exercises.common.utils.ExerciseBase._
-import org.apache.flink.training.exercises.common.utils.{ExerciseBase, 
MissingSolutionException}
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException
 import org.apache.flink.util.Collector
 
+import java.time.Duration
+
 /**
-  * The "Long Ride Alerts" exercise of the Flink training in the docs.
+  * The "Long Ride Alerts" exercise.
   *
-  * The goal for this exercise is to emit START events for taxi rides that 
have not been matched
-  * by an END event during the first 2 hours of the ride.
+  * <p>The goal for this exercise is to emit the rideIds for taxi rides with a 
duration of more than
+  * two hours. You should assume that TaxiRide events can be lost, but there 
are no duplicates.
   *
+  * <p>You should eventually clear any state you create.
   */
 object LongRidesExercise {
 
-  def main(args: Array[String]) {
+  class LongRidesJob(source: SourceFunction[TaxiRide], sink: 
SinkFunction[Long]) {
+
+    /**
+      * Creates and executes the ride cleansing pipeline.
+      */
+    @throws[Exception]
+    def execute(): Unit = {
+      val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+      // start the data generator
+      val rides = env.addSource(source)
+
+      // the WatermarkStrategy specifies how to extract timestamps and 
generate watermarks
+      val watermarkStrategy = WatermarkStrategy
+        .forBoundedOutOfOrderness[TaxiRide](Duration.ofSeconds(60))
+        .withTimestampAssigner(new SerializableTimestampAssigner[TaxiRide] {
+          override def extractTimestamp(ride: TaxiRide, streamRecordTimestamp: 
Long): Long =
+            ride.getEventTime
+        })
 
-    // set up the execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(ExerciseBase.parallelism)
+      // create the pipeline
+      rides
+        .assignTimestampsAndWatermarks(watermarkStrategy)
+        .keyBy(_.rideId)
+        .process(new AlertFunction())
+        .addSink(sink)
 
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
+      // execute the pipeline
+      env.execute("Long Taxi Rides")
+    }
 
-    val longRides = rides
-      .keyBy(_.rideId)
-      .process(new ImplementMeFunction())
+  }
 
-    printOrTest(longRides)
+  @throws[Exception]
+  def main(args: Array[String]) {

Review comment:
       ```suggestion
     def main(args: Array[String]): Unit = {
   ```

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+public class ParallelTestSource<T> extends RichParallelSourceFunction<T>
+        implements ResultTypeQueryable<T> {
+    private T[] testStream;
+    private TypeInformation typeInfo;
+
+    public ParallelTestSource(T... events) {
+        this.typeInfo = TypeExtractor.createTypeInfo(events[0].getClass());

Review comment:
       ```suggestion
       private final T[] testStream;
       private final TypeInformation<T> typeInfo;
   
       @SuppressWarnings("unchecked")
       @SafeVarargs
       public ParallelTestSource(T... events) {
           this.typeInfo = (TypeInformation<T>) 
TypeExtractor.createTypeInfo(events[0].getClass());
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -0,0 +1,119 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import 
org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesUnitTest extends LongRidesTestBase {
+
+    private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
harness;
+
+    private KeyedProcessFunction<Long, TaxiRide, Long> javaExercise =
+            new LongRidesExercise.AlertFunction();
+
+    private KeyedProcessFunction<Long, TaxiRide, Long> javaSolution =
+            new LongRidesSolution.AlertFunction();
+
+    protected ComposedKeyedProcessFunction composedAlertFunction() {
+        return new ComposedKeyedProcessFunction(javaExercise, javaSolution);
+    }
+
+    @Before
+    public void setupTestHarness() throws Exception {
+        this.harness = setupHarness(composedAlertFunction());
+    }
+
+    @Test
+    public void shouldAlertOnLongEndEvent() throws Exception {
+
+        TaxiRide rideStartedButNotSent = startRide(1, BEGINNING);
+        TaxiRide endedThreeHoursLater = endRide(rideStartedButNotSent, 
THREE_HOURS_LATER);
+
+        harness.processElement(endedThreeHoursLater.asStreamRecord());
+        
assertThat(harness.getOutput()).containsExactly(endedThreeHoursLater.idAsStreamRecord());
+    }
+
+    @Test
+    public void shouldNotAlertOnShortEndEvent() throws Exception {
+
+        TaxiRide rideStartedButNotSent = startRide(1, BEGINNING);
+        TaxiRide endedOneMinuteLater = endRide(rideStartedButNotSent, 
ONE_MINUTE_LATER);
+
+        harness.processElement(endedOneMinuteLater.asStreamRecord());
+        assertThat(harness.getOutput()).isEmpty();
+    }
+
+    @Test
+    public void shouldNotAlertWithoutWatermarkOrEndEvent() throws Exception {
+
+        TaxiRide rideStarted = startRide(1, BEGINNING);
+        TaxiRide otherRideStartsThreeHoursLater = startRide(2, 
THREE_HOURS_LATER);
+
+        harness.processElement(rideStarted.asStreamRecord());
+        
harness.processElement(otherRideStartsThreeHoursLater.asStreamRecord());
+        assertThat(harness.getOutput()).isEmpty();
+
+        Watermark mark2HoursLater =
+                new Watermark(BEGINNING.plusSeconds(2 * 60 * 
60).toEpochMilli());
+        harness.processWatermark(mark2HoursLater);
+
+        StreamRecord<Long> rideIdAtTimeOfWatermark =
+                new StreamRecord<>(rideStarted.rideId, 
mark2HoursLater.getTimestamp());
+        
assertThat(harness.getOutput()).containsExactly(rideIdAtTimeOfWatermark, 
mark2HoursLater);
+    }
+
+    @Test
+    public void shouldAlertOnWatermark() throws Exception {
+
+        TaxiRide startOfLongRide = startRide(1, BEGINNING);
+        harness.processElement(startOfLongRide.asStreamRecord());
+
+        // Can't be done properly without some managed keyed state
+        assertThat(harness.numKeyedStateEntries()).isGreaterThan(0);
+
+        // At this point there should be no output
+        ConcurrentLinkedQueue<Object> initialOutput = harness.getOutput();
+        assertThat(initialOutput).isEmpty();
+
+        Watermark mark2HoursLater =
+                new Watermark(BEGINNING.plusSeconds(2 * 60 * 
60).toEpochMilli());
+        harness.processWatermark(mark2HoursLater);
+
+        // Check that the result is correct
+        StreamRecord<Long> rideIdAtTimeOfWatermark =
+                new StreamRecord<>(startOfLongRide.rideId, 
mark2HoursLater.getTimestamp());
+        
assertThat(harness.getOutput()).containsExactly(rideIdAtTimeOfWatermark, 
mark2HoursLater);
+
+        // Check that no state or timers are left behind
+        assertThat(harness.numKeyedStateEntries()).isZero();
+        assertThat(harness.numEventTimeTimers()).isZero();
+    }
+
+    private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
setupHarness(
+            KeyedProcessFunction<Long, TaxiRide, Long> function) throws 
Exception {
+
+        KeyedProcessOperator<Long, TaxiRide, Long> operator = new 
KeyedProcessOperator<>(function);
+
+        KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        operator, (TaxiRide r) -> r.rideId, Types.LONG);

Review comment:
       ```suggestion
                   new KeyedOneInputStreamOperatorTestHarness<>(operator, r -> 
r.rideId, Types.LONG);
   ```

##########
File path: 
long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesUnitTest.java
##########
@@ -0,0 +1,119 @@
+package org.apache.flink.training.exercises.longrides;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+import 
org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction;
+import org.apache.flink.training.solutions.longrides.LongRidesSolution;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+public class LongRidesUnitTest extends LongRidesTestBase {
+
+    private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> 
harness;
+
+    private KeyedProcessFunction<Long, TaxiRide, Long> javaExercise =
+            new LongRidesExercise.AlertFunction();
+
+    private KeyedProcessFunction<Long, TaxiRide, Long> javaSolution =

Review comment:
       ```suggestion
       private final KeyedProcessFunction<Long, TaxiRide, Long> javaSolution =
   ```

##########
File path: 
common/src/test/java/org/apache/flink/training/exercises/testing/ComposedKeyedProcessFunction.java
##########
@@ -0,0 +1,77 @@
+package org.apache.flink.training.exercises.testing;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.training.exercises.common.utils.MissingSolutionException;
+import org.apache.flink.util.Collector;
+
+/**
+ * A KeyedProcessFunction that can delegate to either a KeyedProcessFunction 
in the exercise or in
+ * the solution. The implementation in the exercise is tested first, and if it 
throws
+ * MissingSolutionException, then the solution is tested instead.
+ *
+ * <p>This can be used to write test harness tests.
+ *
+ * @param <K> key type
+ * @param <IN> input type
+ * @param <OUT> output type
+ */
+public class ComposedKeyedProcessFunction<K, IN, OUT> extends 
KeyedProcessFunction<K, IN, OUT> {
+    private KeyedProcessFunction<K, IN, OUT> exercise;
+    private KeyedProcessFunction<K, IN, OUT> solution;
+    private boolean useExercise;
+
+    public ComposedKeyedProcessFunction(
+            KeyedProcessFunction<K, IN, OUT> exercise, KeyedProcessFunction<K, 
IN, OUT> solution) {

Review comment:
       (we can also ignore this)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to