This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-training.git


The following commit(s) were added to refs/heads/master by this push:
     new b7bc718  [FLINK-17277] address IntelliJ warnings
b7bc718 is described below

commit b7bc718bc6d178d474ebdc22e2dc2a27910d693e
Author: Nico Kruber <n...@ververica.com>
AuthorDate: Sat Apr 18 22:41:51 2020 +0200

    [FLINK-17277] address IntelliJ warnings
---
 .../examples/ridecount/RideCountExample.java       |  4 +--
 .../exercises/common/datatypes/TaxiFare.java       | 23 ++++++-------
 .../exercises/common/datatypes/TaxiRide.java       | 39 +++++++++++-----------
 .../exercises/common/sources/TaxiFareSource.java   | 23 ++++++-------
 .../exercises/common/sources/TaxiRideSource.java   | 23 ++++++-------
 .../training/exercises/common/utils/GeoUtils.java  |  3 +-
 .../exercises/testing/TaxiRideTestBase.java        | 25 +++++++++++---
 .../training/exercises/testing/TestSource.java     | 24 +++++--------
 config/checkstyle/checkstyle.xml                   |  4 +--
 config/checkstyle/suppressions.xml                 |  4 +--
 hourly-tips/DISCUSSION.md                          |  2 +-
 .../solutions/hourlytips/HourlyTipsSolution.java   |  6 ++--
 .../hourlytips/scala/HourlyTipsSolution.scala      |  2 +-
 .../exercises/hourlytips/HourlyTipsTest.java       | 14 ++++----
 .../hourlytips/scala/HourlyTipsTest.scala          |  2 +-
 .../exercises/longrides/LongRidesTest.java         | 28 ++++++++--------
 .../ridecleansing/RideCleansingSolution.java       |  3 +-
 .../exercises/ridecleansing/RideCleansingTest.java |  4 +--
 .../ridesandfares/RidesAndFaresSolution.java       |  4 +--
 .../exercises/ridesandfares/RidesAndFaresTest.java | 12 +++----
 .../ridesandfares/scala/RidesAndFaresTest.scala    |  2 +-
 21 files changed, 124 insertions(+), 127 deletions(-)

diff --git 
a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
 
b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index fe17f93..ac7424a 100644
--- 
a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ 
b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -67,8 +67,8 @@ public class RideCountExample {
                // map each ride to a tuple of (driverId, 1)
                DataStream<Tuple2<Long, Long>> tuples = rides.map(new 
MapFunction<TaxiRide, Tuple2<Long, Long>>() {
                                        @Override
-                                       public Tuple2<Long, Long> map(TaxiRide 
ride) throws Exception {
-                                               return new Tuple2<Long, 
Long>(ride.driverId, 1L);
+                                       public Tuple2<Long, Long> map(TaxiRide 
ride) {
+                                               return Tuple2.of(ride.driverId, 
1L);
                                        }
                });
 
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
index 393638e..f1da3e6 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
@@ -34,7 +34,7 @@ import java.util.Locale;
  */
 public class TaxiFare implements Serializable {
 
-       private static transient DateTimeFormatter timeFormatter =
+       private static final DateTimeFormatter TIME_FORMATTER =
                        DateTimeFormat.forPattern("yyyy-MM-dd 
HH:mm:ss").withLocale(Locale.US).withZoneUTC();
 
        /**
@@ -69,17 +69,14 @@ public class TaxiFare implements Serializable {
 
        @Override
        public String toString() {
-               StringBuilder sb = new StringBuilder();
-               sb.append(rideId).append(",");
-               sb.append(taxiId).append(",");
-               sb.append(driverId).append(",");
-               sb.append(startTime.toString(timeFormatter)).append(",");
-               sb.append(paymentType).append(",");
-               sb.append(tip).append(",");
-               sb.append(tolls).append(",");
-               sb.append(totalFare);
-
-               return sb.toString();
+               return rideId + "," +
+                               taxiId + "," +
+                               driverId + "," +
+                               startTime.toString(TIME_FORMATTER) + "," +
+                               paymentType + "," +
+                               tip + "," +
+                               tolls + "," +
+                               totalFare;
        }
 
        /**
@@ -98,7 +95,7 @@ public class TaxiFare implements Serializable {
                        ride.rideId = Long.parseLong(tokens[0]);
                        ride.taxiId = Long.parseLong(tokens[1]);
                        ride.driverId = Long.parseLong(tokens[2]);
-                       ride.startTime = DateTime.parse(tokens[3], 
timeFormatter);
+                       ride.startTime = DateTime.parse(tokens[3], 
TIME_FORMATTER);
                        ride.paymentType = tokens[4];
                        ride.tip = tokens[5].length() > 0 ? 
Float.parseFloat(tokens[5]) : 0.0f;
                        ride.tolls = tokens[6].length() > 0 ? 
Float.parseFloat(tokens[6]) : 0.0f;
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
index 7c78dec..a923a00 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
@@ -24,6 +24,8 @@ import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Locale;
 
@@ -45,7 +47,7 @@ import java.util.Locale;
  */
 public class TaxiRide implements Comparable<TaxiRide>, Serializable {
 
-       private static transient DateTimeFormatter timeFormatter =
+       private static final DateTimeFormatter TIME_FORMATTER =
                        DateTimeFormat.forPattern("yyyy-MM-dd 
HH:mm:ss").withLocale(Locale.US).withZoneUTC();
 
        /**
@@ -89,20 +91,17 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
 
        @Override
        public String toString() {
-               StringBuilder sb = new StringBuilder();
-               sb.append(rideId).append(",");
-               sb.append(isStart ? "START" : "END").append(",");
-               sb.append(startTime.toString(timeFormatter)).append(",");
-               sb.append(endTime.toString(timeFormatter)).append(",");
-               sb.append(startLon).append(",");
-               sb.append(startLat).append(",");
-               sb.append(endLon).append(",");
-               sb.append(endLat).append(",");
-               sb.append(passengerCnt).append(",");
-               sb.append(taxiId).append(",");
-               sb.append(driverId);
-
-               return sb.toString();
+               return rideId + "," +
+                               (isStart ? "START" : "END") + "," +
+                               startTime.toString(TIME_FORMATTER) + "," +
+                               endTime.toString(TIME_FORMATTER) + "," +
+                               startLon + "," +
+                               startLat + "," +
+                               endLon + "," +
+                               endLat + "," +
+                               passengerCnt + "," +
+                               taxiId + "," +
+                               driverId;
        }
 
        /**
@@ -123,13 +122,13 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
                        switch (tokens[1]) {
                                case "START":
                                        ride.isStart = true;
-                                       ride.startTime = 
DateTime.parse(tokens[2], timeFormatter);
-                                       ride.endTime = 
DateTime.parse(tokens[3], timeFormatter);
+                                       ride.startTime = 
DateTime.parse(tokens[2], TIME_FORMATTER);
+                                       ride.endTime = 
DateTime.parse(tokens[3], TIME_FORMATTER);
                                        break;
                                case "END":
                                        ride.isStart = false;
-                                       ride.endTime = 
DateTime.parse(tokens[2], timeFormatter);
-                                       ride.startTime = 
DateTime.parse(tokens[3], timeFormatter);
+                                       ride.endTime = 
DateTime.parse(tokens[2], TIME_FORMATTER);
+                                       ride.startTime = 
DateTime.parse(tokens[3], TIME_FORMATTER);
                                        break;
                                default:
                                        throw new RuntimeException("Invalid 
record: " + line);
@@ -158,7 +157,7 @@ public class TaxiRide implements Comparable<TaxiRide>, 
Serializable {
         *     <li>putting START events before END events if they have the same 
timestamp</li>
         * </ul>
         */
-       public int compareTo(TaxiRide other) {
+       public int compareTo(@Nullable TaxiRide other) {
                if (other == null) {
                        return 1;
                }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
index c0f3c88..a02eff6 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
@@ -28,6 +28,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Calendar;
 import java.util.Comparator;
 import java.util.PriorityQueue;
@@ -103,7 +104,7 @@ public class TaxiFareSource implements 
SourceFunction<TaxiFare> {
                }
                this.dataFilePath = dataFilePath;
                this.maxDelayMsecs = maxEventDelaySecs * 1000;
-               this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : 
maxDelayMsecs;
+               this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
                this.servingSpeed = servingSpeedFactor;
        }
 
@@ -111,7 +112,7 @@ public class TaxiFareSource implements 
SourceFunction<TaxiFare> {
        public void run(SourceContext<TaxiFare> sourceContext) throws Exception 
{
 
                gzipStream = new GZIPInputStream(new 
FileInputStream(dataFilePath));
-               reader = new BufferedReader(new InputStreamReader(gzipStream, 
"UTF-8"));
+               reader = new BufferedReader(new InputStreamReader(gzipStream, 
StandardCharsets.UTF_8));
 
                generateUnorderedStream(sourceContext);
 
@@ -130,12 +131,7 @@ public class TaxiFareSource implements 
SourceFunction<TaxiFare> {
                Random rand = new Random(7452);
                PriorityQueue<Tuple2<Long, Object>> emitSchedule = new 
PriorityQueue<>(
                                32,
-                               new Comparator<Tuple2<Long, Object>>() {
-                                       @Override
-                                       public int compare(Tuple2<Long, Object> 
o1, Tuple2<Long, Object> o2) {
-                                               return o1.f0.compareTo(o2.f0);
-                                       }
-                               });
+                               Comparator.comparing(o -> o.f0));
 
                // read first ride and insert it into emit schedule
                String line;
@@ -148,11 +144,11 @@ public class TaxiFareSource implements 
SourceFunction<TaxiFare> {
                        // get delayed time
                        long delayedEventTime = dataStartTime + 
getNormalDelayMsecs(rand);
 
-                       emitSchedule.add(new Tuple2<Long, 
Object>(delayedEventTime, fare));
+                       emitSchedule.add(Tuple2.of(delayedEventTime, fare));
                        // schedule next watermark
                        long watermarkTime = dataStartTime + 
watermarkDelayMSecs;
                        Watermark nextWatermark = new Watermark(watermarkTime - 
maxDelayMsecs - 1);
-                       emitSchedule.add(new Tuple2<Long, 
Object>(watermarkTime, nextWatermark));
+                       emitSchedule.add(Tuple2.of(watermarkTime, 
nextWatermark));
 
                } else {
                        return;
@@ -176,7 +172,7 @@ public class TaxiFareSource implements 
SourceFunction<TaxiFare> {
                                        ) {
                                // insert event into emit schedule
                                long delayedEventTime = rideEventTime + 
getNormalDelayMsecs(rand);
-                               emitSchedule.add(new Tuple2<Long, 
Object>(delayedEventTime, fare));
+                               emitSchedule.add(Tuple2.of(delayedEventTime, 
fare));
 
                                // read next ride
                                if (reader.ready() && (line = 
reader.readLine()) != null) {
@@ -190,13 +186,14 @@ public class TaxiFareSource implements 
SourceFunction<TaxiFare> {
                        }
 
                        // emit schedule is updated, emit next element in 
schedule
-                       Tuple2<Long, Object> head = emitSchedule.poll();
+                       Tuple2<Long, Object> head = emitSchedule.remove();
                        long delayedEventTime = head.f0;
 
                        long now = Calendar.getInstance().getTimeInMillis();
                        long servingTime = toServingTime(servingStartTime, 
dataStartTime, delayedEventTime);
                        long waitTime = servingTime - now;
 
+                       //noinspection BusyWait
                        Thread.sleep((waitTime > 0) ? waitTime : 0);
 
                        if (head.f1 instanceof TaxiFare) {
@@ -211,7 +208,7 @@ public class TaxiFareSource implements 
SourceFunction<TaxiFare> {
                                // schedule next watermark
                                long watermarkTime = delayedEventTime + 
watermarkDelayMSecs;
                                Watermark nextWatermark = new 
Watermark(watermarkTime - maxDelayMsecs - 1);
-                               emitSchedule.add(new Tuple2<Long, 
Object>(watermarkTime, nextWatermark));
+                               emitSchedule.add(Tuple2.of(watermarkTime, 
nextWatermark));
                        }
                }
        }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
index 6bfb970..b19a310 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
@@ -28,6 +28,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Calendar;
 import java.util.Comparator;
 import java.util.PriorityQueue;
@@ -103,7 +104,7 @@ public class TaxiRideSource implements 
SourceFunction<TaxiRide> {
                }
                this.dataFilePath = dataFilePath;
                this.maxDelayMsecs = maxEventDelaySecs * 1000;
-               this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : 
maxDelayMsecs;
+               this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
                this.servingSpeed = servingSpeedFactor;
        }
 
@@ -111,7 +112,7 @@ public class TaxiRideSource implements 
SourceFunction<TaxiRide> {
        public void run(SourceContext<TaxiRide> sourceContext) throws Exception 
{
 
                gzipStream = new GZIPInputStream(new 
FileInputStream(dataFilePath));
-               reader = new BufferedReader(new InputStreamReader(gzipStream, 
"UTF-8"));
+               reader = new BufferedReader(new InputStreamReader(gzipStream, 
StandardCharsets.UTF_8));
 
                generateUnorderedStream(sourceContext);
 
@@ -130,12 +131,7 @@ public class TaxiRideSource implements 
SourceFunction<TaxiRide> {
                Random rand = new Random(7452);
                PriorityQueue<Tuple2<Long, Object>> emitSchedule = new 
PriorityQueue<>(
                                32,
-                               new Comparator<Tuple2<Long, Object>>() {
-                                       @Override
-                                       public int compare(Tuple2<Long, Object> 
o1, Tuple2<Long, Object> o2) {
-                                               return o1.f0.compareTo(o2.f0);
-                                       }
-                               });
+                               Comparator.comparing(o -> o.f0));
 
                // read first ride and insert it into emit schedule
                String line;
@@ -148,11 +144,11 @@ public class TaxiRideSource implements 
SourceFunction<TaxiRide> {
                        // get delayed time
                        long delayedEventTime = dataStartTime + 
getNormalDelayMsecs(rand);
 
-                       emitSchedule.add(new Tuple2<Long, 
Object>(delayedEventTime, ride));
+                       emitSchedule.add(Tuple2.of(delayedEventTime, ride));
                        // schedule next watermark
                        long watermarkTime = dataStartTime + 
watermarkDelayMSecs;
                        Watermark nextWatermark = new Watermark(watermarkTime - 
maxDelayMsecs - 1);
-                       emitSchedule.add(new Tuple2<Long, 
Object>(watermarkTime, nextWatermark));
+                       emitSchedule.add(Tuple2.of(watermarkTime, 
nextWatermark));
 
                } else {
                        return;
@@ -176,7 +172,7 @@ public class TaxiRideSource implements 
SourceFunction<TaxiRide> {
                                        ) {
                                // insert event into emit schedule
                                long delayedEventTime = rideEventTime + 
getNormalDelayMsecs(rand);
-                               emitSchedule.add(new Tuple2<Long, 
Object>(delayedEventTime, ride));
+                               emitSchedule.add(Tuple2.of(delayedEventTime, 
ride));
 
                                // read next ride
                                if (reader.ready() && (line = 
reader.readLine()) != null) {
@@ -190,13 +186,14 @@ public class TaxiRideSource implements 
SourceFunction<TaxiRide> {
                        }
 
                        // emit schedule is updated, emit next element in 
schedule
-                       Tuple2<Long, Object> head = emitSchedule.poll();
+                       Tuple2<Long, Object> head = emitSchedule.remove();
                        long delayedEventTime = head.f0;
 
                        long now = Calendar.getInstance().getTimeInMillis();
                        long servingTime = toServingTime(servingStartTime, 
dataStartTime, delayedEventTime);
                        long waitTime = servingTime - now;
 
+                       //noinspection BusyWait
                        Thread.sleep((waitTime > 0) ? waitTime : 0);
 
                        if (head.f1 instanceof TaxiRide) {
@@ -211,7 +208,7 @@ public class TaxiRideSource implements 
SourceFunction<TaxiRide> {
                                // schedule next watermark
                                long watermarkTime = delayedEventTime + 
watermarkDelayMSecs;
                                Watermark nextWatermark = new 
Watermark(watermarkTime - maxDelayMsecs - 1);
-                               emitSchedule.add(new Tuple2<Long, 
Object>(watermarkTime, nextWatermark));
+                               emitSchedule.add(Tuple2.of(watermarkTime, 
nextWatermark));
                        }
                }
        }
diff --git 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
index 4594248..e638ab3 100644
--- 
a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
+++ 
b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
@@ -245,9 +245,8 @@ public class GeoUtils {
 
                double x = destLat - startLat;
                double y = (destLon - startLon) * Math.cos(startLat);
-               int degrees = (int) Math.toDegrees(Math.atan2(x, y)) + 179;
 
-               return degrees;
+               return (int) Math.toDegrees(Math.atan2(x, y)) + 179;
        }
 
 }
diff --git 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
index 6c59de1..f4f5596 100644
--- 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
+++ 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
@@ -30,34 +30,49 @@ import java.util.ArrayList;
 import java.util.List;
 
 public abstract class TaxiRideTestBase<OUT> {
-       public static class TestRideSource extends TestSource implements 
ResultTypeQueryable<TaxiRide> {
+       public static class TestRideSource extends TestSource<TaxiRide> 
implements ResultTypeQueryable<TaxiRide> {
                public TestRideSource(Object ... eventsOrWatermarks) {
                        this.testStream = eventsOrWatermarks;
                }
 
                @Override
+               long getTimestamp(TaxiRide ride) {
+                       return ride.getEventTime();
+               }
+
+               @Override
                public TypeInformation<TaxiRide> getProducedType() {
                        return TypeInformation.of(TaxiRide.class);
                }
        }
 
-       public static class TestFareSource extends TestSource implements 
ResultTypeQueryable<TaxiFare> {
+       public static class TestFareSource extends TestSource<TaxiFare> 
implements ResultTypeQueryable<TaxiFare> {
                public TestFareSource(Object ... eventsOrWatermarks) {
                        this.testStream = eventsOrWatermarks;
                }
 
                @Override
+               long getTimestamp(TaxiFare fare) {
+                       return fare.getEventTime();
+               }
+
+               @Override
                public TypeInformation<TaxiFare> getProducedType() {
                        return TypeInformation.of(TaxiFare.class);
                }
        }
 
-       public static class TestStringSource extends TestSource implements 
ResultTypeQueryable<String> {
+       public static class TestStringSource extends TestSource<String> 
implements ResultTypeQueryable<String> {
                public TestStringSource(Object ... eventsOrWatermarks) {
                        this.testStream = eventsOrWatermarks;
                }
 
                @Override
+               long getTimestamp(String s) {
+                       return 0L;
+               }
+
+               @Override
                public TypeInformation<String> getProducedType() {
                        return TypeInformation.of(String.class);
                }
@@ -69,13 +84,13 @@ public abstract class TaxiRideTestBase<OUT> {
                public static final List VALUES = new ArrayList<>();
 
                @Override
-               public void invoke(OUT value, Context context) throws Exception 
{
+               public void invoke(OUT value, Context context) {
                        VALUES.add(value);
                }
        }
 
        public interface Testable {
-               public abstract void main() throws Exception;
+               void main() throws Exception;
        }
 
        protected List<OUT> runApp(TestRideSource source, TestSink<OUT> sink, 
Testable exercise, Testable solution) throws Exception {
diff --git 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
index 57d72ab..84ba53e 100644
--- 
a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
+++ 
b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
@@ -20,35 +20,29 @@ package org.apache.flink.training.exercises.testing;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
-import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 
-public abstract class TestSource implements SourceFunction {
+public abstract class TestSource<T> implements SourceFunction<T> {
        private volatile boolean running = true;
+       // T or watermark (Long)
        protected Object[] testStream;
 
        @Override
-       public void run(SourceContext ctx) throws Exception {
+       public void run(SourceContext<T> ctx) {
                for (int i = 0; (i < testStream.length) && running; i++) {
-                       if (testStream[i] instanceof TaxiRide) {
-                               TaxiRide ride = (TaxiRide) testStream[i];
-                               ctx.collectWithTimestamp(ride, 
ride.getEventTime());
-                       } else if (testStream[i] instanceof TaxiFare) {
-                               TaxiFare fare = (TaxiFare) testStream[i];
-                               ctx.collectWithTimestamp(fare, 
fare.getEventTime());
-                       } else if (testStream[i] instanceof String) {
-                               String s = (String) testStream[i];
-                               ctx.collectWithTimestamp(s, 0);
-                       } else if (testStream[i] instanceof Long) {
+                       if (testStream[i] instanceof Long) {
                                Long ts = (Long) testStream[i];
                                ctx.emitWatermark(new Watermark(ts));
                        } else {
-                               throw new 
RuntimeException(testStream[i].toString());
+                               //noinspection unchecked
+                               T element = (T) testStream[i];
+                               ctx.collectWithTimestamp(element, 
getTimestamp(element));
                        }
                }
                // test sources are finite, so they have a Long.MAX_VALUE 
watermark when they finishes
        }
 
+       abstract long getTimestamp(T element);
+
        @Override
        public void cancel() {
                running = false;
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index 34b0e76..e656fda 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -18,8 +18,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 <!DOCTYPE module PUBLIC
-          "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
-          "http://www.puppycrawl.com/dtds/configuration_1_3.dtd";>
+        "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
+        "https://checkstyle.org/dtds/configuration_1_3.dtd";>
 
 <!--
 This is a checkstyle configuration file. For descriptions of
diff --git a/config/checkstyle/suppressions.xml 
b/config/checkstyle/suppressions.xml
index 0b05a64..01e36b5 100644
--- a/config/checkstyle/suppressions.xml
+++ b/config/checkstyle/suppressions.xml
@@ -19,8 +19,8 @@ under the License.
 -->
 
 <!DOCTYPE suppressions PUBLIC
-               "-//Puppy Crawl//DTD Suppressions 1.1//EN"
-               "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd";>
+               "-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
+               "https://checkstyle.org/dtds/suppressions_1_2.dtd";>
 
 <suppressions>
        <suppress
diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md
index 2e91b2d..ab653d3 100644
--- a/hourly-tips/DISCUSSION.md
+++ b/hourly-tips/DISCUSSION.md
@@ -41,7 +41,7 @@ public static class AddTips extends ProcessWindowFunction<
                for (TaxiFare f : fares) {
                        sumOfTips += f.tip;
                }
-               out.collect(new Tuple3<>(context.window().getEnd(), key, 
sumOfTips));
+               out.collect(Tuple3.of(context.window().getEnd(), key, 
sumOfTips));
        }
 }
 ```
diff --git 
a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
 
b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index 4e2bb32..b640a19 100644
--- 
a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ 
b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -96,12 +96,12 @@ public class HourlyTipsSolution extends ExerciseBase {
        public static class AddTips extends ProcessWindowFunction<
                        TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
                @Override
-               public void process(Long key, Context context, 
Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) throws 
Exception {
-                       Float sumOfTips = 0F;
+               public void process(Long key, Context context, 
Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) {
+                       float sumOfTips = 0F;
                        for (TaxiFare f : fares) {
                                sumOfTips += f.tip;
                        }
-                       out.collect(new Tuple3<>(context.window().getEnd(), 
key, sumOfTips));
+                       out.collect(Tuple3.of(context.window().getEnd(), key, 
sumOfTips));
                }
        }
 }
diff --git 
a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
 
b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
index 3ad6243..ab3de6c 100644
--- 
a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
+++ 
b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
@@ -82,7 +82,7 @@ object HourlyTipsSolution {
   class WrapWithWindowInfo() extends ProcessWindowFunction[(Long, Float), 
(Long, Long, Float), Long, TimeWindow] {
     override def process(key: Long, context: Context, elements: 
Iterable[(Long, Float)], out: Collector[(Long, Long, Float)]): Unit = {
       val sumOfTips = elements.iterator.next()._2
-      out.collect((context.window.getEnd(), key, sumOfTips))
+      out.collect((context.window.getEnd, key, sumOfTips))
     }
   }
 
diff --git 
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
 
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index 565a8d3..eef1bdf 100644
--- 
a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++ 
b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals;
 
 public class HourlyTipsTest extends TaxiRideTestBase<Tuple3<Long, Long, 
Float>> {
 
-       static Testable javaExercise = () -> HourlyTipsExercise.main(new 
String[]{});
+       static final Testable JAVA_EXERCISE = () -> HourlyTipsExercise.main(new 
String[]{});
 
        @Test
        public void testOneDriverOneTip() throws Exception {
@@ -45,7 +45,7 @@ public class HourlyTipsTest extends 
TaxiRideTestBase<Tuple3<Long, Long, Float>>
                                one
                );
 
-               Tuple3<Long, Long, Float> max = new Tuple3<Long, Long, 
Float>(t(60), 1L, 1.0F);
+               Tuple3<Long, Long, Float> max = Tuple3.of(t(60), 1L, 1.0F);
 
                List<Tuple3<Long, Long, Float>> expected = 
Collections.singletonList(max);
 
@@ -64,8 +64,8 @@ public class HourlyTipsTest extends 
TaxiRideTestBase<Tuple3<Long, Long, Float>>
                                tenIn2
                );
 
-               Tuple3<Long, Long, Float> hour1 = new Tuple3<Long, Long, 
Float>(t(60), 1L, 6.0F);
-               Tuple3<Long, Long, Float> hour2 = new Tuple3<Long, Long, 
Float>(t(120), 1L, 10.0F);
+               Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
+               Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 1L, 10.0F);
 
                List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, 
hour2);
 
@@ -86,8 +86,8 @@ public class HourlyTipsTest extends 
TaxiRideTestBase<Tuple3<Long, Long, Float>>
                                twentyFor2In2
                );
 
-               Tuple3<Long, Long, Float> hour1 = new Tuple3<Long, Long, 
Float>(t(60), 1L, 6.0F);
-               Tuple3<Long, Long, Float> hour2 = new Tuple3<Long, Long, 
Float>(t(120), 2L, 20.0F);
+               Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
+               Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 2L, 20.0F);
 
                List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, 
hour2);
 
@@ -104,7 +104,7 @@ public class HourlyTipsTest extends 
TaxiRideTestBase<Tuple3<Long, Long, Float>>
 
        protected List<Tuple3<Long, Long, Float>> results(TestFareSource 
source) throws Exception {
                Testable javaSolution = () -> HourlyTipsSolution.main(new 
String[]{});
-               return runApp(source, new TestSink<>(), javaExercise, 
javaSolution);
+               return runApp(source, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
        }
 
 }
diff --git 
a/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
 
b/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
index fbcedf8..aacaab5 100644
--- 
a/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
+++ 
b/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
@@ -32,7 +32,7 @@ class HourlyTipsTest extends hourlytips.HourlyTipsTest {
   override protected def results(source: TaxiRideTestBase.TestFareSource): 
util.List[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]] = {
     val scalaSolution: TaxiRideTestBase.Testable = () => 
HourlyTipsSolution.main(Array.empty[String])
     val tuples: util.List[_] = runApp(source, new 
TaxiRideTestBase.TestSink[tuple.Tuple3[java.lang.Long, java.lang.Long, 
java.lang.Float]], scalaExercise, scalaSolution)
-    javaTuples(tuples.asInstanceOf[util.List[Tuple3[Long, Long, Float]]])
+    javaTuples(tuples.asInstanceOf[util.List[(Long, Long, Float)]])
   }
 
   private def javaTuples(a: util.List[(Long, Long, Float)]): 
util.ArrayList[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]] = 
{
diff --git 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
index 9bd68ae..ba3e6b6 100644
--- 
a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
+++ 
b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
@@ -32,14 +32,14 @@ import static org.junit.Assert.assertEquals;
 
 public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
 
-       static Testable javaExercise = () -> LongRidesExercise.main(new 
String[]{});
+       static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new 
String[]{});
 
-       private DateTime beginning = new DateTime(2000, 1, 1, 0, 0);
+       private static final DateTime BEGINNING = new DateTime(2000, 1, 1, 0, 
0);
 
        @Test
        public void shortRide() throws Exception {
-               DateTime oneMinLater = beginning.plusMinutes(1);
-               TaxiRide rideStarted = startRide(1, beginning);
+               DateTime oneMinLater = BEGINNING.plusMinutes(1);
+               TaxiRide rideStarted = startRide(1, BEGINNING);
                TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
                Long markOneMinLater = oneMinLater.getMillis();
 
@@ -49,8 +49,8 @@ public class LongRidesTest extends TaxiRideTestBase<TaxiRide> 
{
 
        @Test
        public void outOfOrder() throws Exception {
-               DateTime oneMinLater = beginning.plusMinutes(1);
-               TaxiRide rideStarted = startRide(1, beginning);
+               DateTime oneMinLater = BEGINNING.plusMinutes(1);
+               TaxiRide rideStarted = startRide(1, BEGINNING);
                TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
                Long markOneMinLater = oneMinLater.getMillis();
 
@@ -60,8 +60,8 @@ public class LongRidesTest extends TaxiRideTestBase<TaxiRide> 
{
 
        @Test
        public void noStartShort() throws Exception {
-               DateTime oneMinLater = beginning.plusMinutes(1);
-               TaxiRide rideStarted = startRide(1, beginning);
+               DateTime oneMinLater = BEGINNING.plusMinutes(1);
+               TaxiRide rideStarted = startRide(1, BEGINNING);
                TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
                Long markOneMinLater = oneMinLater.getMillis();
 
@@ -71,8 +71,8 @@ public class LongRidesTest extends TaxiRideTestBase<TaxiRide> 
{
 
        @Test
        public void noEnd() throws Exception {
-               TaxiRide rideStarted = startRide(1, beginning);
-               Long markThreeHoursLater = beginning.plusHours(3).getMillis();
+               TaxiRide rideStarted = startRide(1, BEGINNING);
+               Long markThreeHoursLater = BEGINNING.plusHours(3).getMillis();
 
                TestRideSource source = new TestRideSource(rideStarted, 
markThreeHoursLater);
                assertEquals(Collections.singletonList(rideStarted), 
results(source));
@@ -80,9 +80,9 @@ public class LongRidesTest extends TaxiRideTestBase<TaxiRide> 
{
 
        @Test
        public void longRide() throws Exception {
-               TaxiRide rideStarted = startRide(1, beginning);
-               Long mark2HoursLater = beginning.plusMinutes(120).getMillis();
-               TaxiRide rideEnded3HoursLater = endRide(rideStarted, 
beginning.plusHours(3));
+               TaxiRide rideStarted = startRide(1, BEGINNING);
+               Long mark2HoursLater = BEGINNING.plusMinutes(120).getMillis();
+               TaxiRide rideEnded3HoursLater = endRide(rideStarted, 
BEGINNING.plusHours(3));
 
                TestRideSource source = new TestRideSource(rideStarted, 
mark2HoursLater, rideEnded3HoursLater);
                assertEquals(Collections.singletonList(rideStarted), 
results(source));
@@ -102,7 +102,7 @@ public class LongRidesTest extends 
TaxiRideTestBase<TaxiRide> {
 
        protected List<TaxiRide> results(TestRideSource source) throws 
Exception {
                Testable javaSolution = () -> LongRidesSolution.main(new 
String[]{});
-               return runApp(source, new TestSink<>(), javaExercise, 
javaSolution);
+               return runApp(source, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
        }
 
 }
diff --git 
a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
 
b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
index cf3dca5..e16ea83 100644
--- 
a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
+++ 
b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
@@ -74,8 +74,7 @@ public class RideCleansingSolution extends ExerciseBase {
 
        public static class NYCFilter implements FilterFunction<TaxiRide> {
                @Override
-               public boolean filter(TaxiRide taxiRide) throws Exception {
-
+               public boolean filter(TaxiRide taxiRide) {
                        return GeoUtils.isInNYC(taxiRide.startLon, 
taxiRide.startLat) &&
                                        GeoUtils.isInNYC(taxiRide.endLon, 
taxiRide.endLat);
                }
diff --git 
a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
 
b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
index b0e151f..e4554d4 100644
--- 
a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
+++ 
b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.assertEquals;
 
 public class RideCleansingTest extends TaxiRideTestBase<TaxiRide> {
 
-       static Testable javaExercise = () -> RideCleansingExercise.main(new 
String[]{});
+       static final Testable JAVA_EXERCISE = () -> 
RideCleansingExercise.main(new String[]{});
 
        @Test
        public void testInNYC() throws Exception {
@@ -61,7 +61,7 @@ public class RideCleansingTest extends 
TaxiRideTestBase<TaxiRide> {
 
        protected List<?> results(TestRideSource source) throws Exception {
                Testable javaSolution = () -> RideCleansingSolution.main(new 
String[]{});
-               return runApp(source, new TestSink<>(), javaExercise, 
javaSolution);
+               return runApp(source, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
        }
 
 }
diff --git 
a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
 
b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index 961aa40..0254540 100644
--- 
a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ 
b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -114,7 +114,7 @@ public class RidesAndFaresSolution extends ExerciseBase {
                        TaxiFare fare = fareState.value();
                        if (fare != null) {
                                fareState.clear();
-                               out.collect(new Tuple2(ride, fare));
+                               out.collect(Tuple2.of(ride, fare));
                        } else {
                                rideState.update(ride);
                        }
@@ -125,7 +125,7 @@ public class RidesAndFaresSolution extends ExerciseBase {
                        TaxiRide ride = rideState.value();
                        if (ride != null) {
                                rideState.clear();
-                               out.collect(new Tuple2(ride, fare));
+                               out.collect(Tuple2.of(ride, fare));
                        } else {
                                fareState.update(fare);
                        }
diff --git 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
index e1d270a..c2b1fb1 100644
--- 
a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
+++ 
b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
@@ -35,7 +35,7 @@ import static org.junit.Assert.assertThat;
 
 public class RidesAndFaresTest extends TaxiRideTestBase<Tuple2<TaxiRide, 
TaxiFare>> {
 
-       static Testable javaExercise = () -> RidesAndFaresExercise.main(new 
String[]{});
+       static final Testable JAVA_EXERCISE = () -> 
RidesAndFaresExercise.main(new String[]{});
 
 
        final TaxiRide ride1 = testRide(1);
@@ -49,8 +49,8 @@ public class RidesAndFaresTest extends 
TaxiRideTestBase<Tuple2<TaxiRide, TaxiFar
                TestFareSource fares = new TestFareSource(fare1, fare2);
 
                List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
-                               new Tuple2<>(ride1, fare1),
-                               new Tuple2<>(ride2, fare2));
+                               Tuple2.of(ride1, fare1),
+                               Tuple2.of(ride2, fare2));
 
                assertThat("Join results don't match", results(rides, fares), 
containsInAnyOrder(expected.toArray()));
        }
@@ -61,8 +61,8 @@ public class RidesAndFaresTest extends 
TaxiRideTestBase<Tuple2<TaxiRide, TaxiFar
                TestFareSource fares = new TestFareSource(fare2, fare1);
 
                List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
-                               new Tuple2<>(ride1, fare1),
-                               new Tuple2<>(ride2, fare2));
+                               Tuple2.of(ride1, fare1),
+                               Tuple2.of(ride2, fare2));
 
                assertThat("Join results don't match", results(rides, fares), 
containsInAnyOrder(expected.toArray()));
        }
@@ -78,7 +78,7 @@ public class RidesAndFaresTest extends 
TaxiRideTestBase<Tuple2<TaxiRide, TaxiFar
 
        protected List<?> results(TestRideSource rides, TestFareSource fares) 
throws Exception {
                Testable javaSolution = () -> RidesAndFaresSolution.main(new 
String[]{});
-               return runApp(rides, fares, new TestSink<>(), javaExercise, 
javaSolution);
+               return runApp(rides, fares, new TestSink<>(), JAVA_EXERCISE, 
javaSolution);
        }
 
 }
diff --git 
a/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
 
b/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
index c162122..03f7e6a 100644
--- 
a/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
+++ 
b/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
@@ -33,7 +33,7 @@ class RidesAndFaresTest extends 
ridesandfares.RidesAndFaresTest{
   override protected def results(rides: TaxiRideTestBase.TestRideSource, 
fares: TaxiRideTestBase.TestFareSource): util.List[tuple.Tuple2[TaxiRide, 
TaxiFare]] = {
     val scalaSolution: TaxiRideTestBase.Testable = () => 
RidesAndFaresSolution.main(Array.empty[String])
     val tuples: util.List[_] = runApp(rides, fares, new 
TaxiRideTestBase.TestSink[tuple.Tuple2[TaxiRide, TaxiFare]], scalaExercise, 
scalaSolution)
-    javaTuples(tuples.asInstanceOf[util.List[Tuple2[TaxiRide, TaxiFare]]])
+    javaTuples(tuples.asInstanceOf[util.List[(TaxiRide, TaxiFare)]])
   }
 
   private def javaTuples(a: util.List[(TaxiRide, TaxiFare)]): 
util.ArrayList[tuple.Tuple2[TaxiRide, TaxiFare]] = {

Reply via email to