This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 2cf0829 KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered
immediately (#4301)
2cf0829 is described below
commit 2cf0829c7c21eaf1c844f4deddbbdd8b1411341a
Author: fredfp <[email protected]>
AuthorDate: Wed Jan 31 10:18:55 2018 +0800
KAFKA-6323: punctuate with WALL_CLOCK_TIME triggered immediately (#4301)
This PR avoids unnecessary punctuation calls if punctuations are missed due
to large time advances. It also aligns punctuation schedules to the epoch.
Author: Frederic Arno
Reviewers: Michal Borowiecki <[email protected]>, Guozhang Wang
<[email protected]>, Damian Guy <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/processor/ProcessorContext.java | 15 +-
.../processor/internals/PunctuationSchedule.java | 34 +++--
.../streams/processor/internals/StreamTask.java | 26 +++-
.../processor/internals/PunctuationQueueTest.java | 60 +++++++-
.../processor/internals/StreamTaskTest.java | 165 +++++++++++++++++++--
5 files changed, 269 insertions(+), 31 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 385d641..f9969d4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -102,11 +102,22 @@ public interface ProcessorContext {
* <ul>
* <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which
is advanced by the processing of messages
* in accordance with the timestamp as extracted by the {@link
TimestampExtractor} in use.
+ * The first punctuation will be triggered by the first record that is
processed.
* <b>NOTE:</b> Only advanced if messages arrive</li>
* <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the
wall-clock time),
* which is advanced at the polling interval ({@link
org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
- * independent of whether new messages arrive. <b>NOTE:</b> This is best
effort only as its granularity is limited
- * by how long an iteration of the processing loop takes to complete</li>
+ * independent of whether new messages arrive.
+ * The first punctuation will be triggered after interval has elapsed.
+ * <b>NOTE:</b> This is best effort only as its granularity is limited
by how long an iteration of the
+ * processing loop takes to complete</li>
+ * </ul>
+ *
+ * <b>Skipping punctuations:</b> Punctuations will not be triggered more
than once at any given timestamp.
+ * This means that "missed" punctuation will be skipped.
+ * It's possible to "miss" a punctuation if:
+ * <ul>
+ * <li>with {@link PunctuationType#STREAM_TIME}, when stream time
advances more than interval</li>
+ * <li>with {@link PunctuationType#WALL_CLOCK_TIME}, on GC pause, too
short interval, ...</li>
* </ul>
*
* @param interval the time interval between punctuations
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index cf50005..9c0ec88 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -27,12 +27,19 @@ public class PunctuationSchedule extends
Stamped<ProcessorNode> {
// this Cancellable will be re-pointed at the successor schedule in next()
private final RepointableCancellable cancellable;
- PunctuationSchedule(ProcessorNode node, long interval, Punctuator
punctuator) {
- this(node, 0L, interval, punctuator, new RepointableCancellable());
+ PunctuationSchedule(final ProcessorNode node,
+ final long time,
+ final long interval,
+ final Punctuator punctuator) {
+ this(node, time, interval, punctuator, new RepointableCancellable());
cancellable.setSchedule(this);
}
- private PunctuationSchedule(ProcessorNode node, long time, long interval,
Punctuator punctuator, RepointableCancellable cancellable) {
+ private PunctuationSchedule(final ProcessorNode node,
+ final long time,
+ final long interval,
+ final Punctuator punctuator,
+ final RepointableCancellable cancellable) {
super(node, time);
this.interval = interval;
this.punctuator = punctuator;
@@ -59,14 +66,19 @@ public class PunctuationSchedule extends
Stamped<ProcessorNode> {
return isCancelled;
}
- public PunctuationSchedule next(long currTimestamp) {
- PunctuationSchedule nextSchedule;
- // we need to special handle the case when it is firstly triggered
(i.e. the timestamp
- // is equal to the interval) by reschedule based on the currTimestamp
- if (timestamp == 0L)
- nextSchedule = new PunctuationSchedule(value, currTimestamp +
interval, interval, punctuator, cancellable);
- else
- nextSchedule = new PunctuationSchedule(value, timestamp +
interval, interval, punctuator, cancellable);
+ public PunctuationSchedule next(final long currTimestamp) {
+ long nextPunctuationTime = timestamp + interval;
+ if (currTimestamp >= nextPunctuationTime) {
+ // we missed one ore more punctuations
+ // avoid scheduling a new punctuations immediately, this can
happen:
+ // - when using STREAM_TIME punctuation and there was a gap i.e.,
no data was
+ // received for at least 2*interval
+ // - when using WALL_CLOCK_TIME and there was a gap i.e.,
punctuation was delayed for at least 2*interval (GC pause, overload, ...)
+ final long intervalsMissed = (currTimestamp - timestamp) /
interval;
+ nextPunctuationTime = timestamp + (intervalsMissed + 1) * interval;
+ }
+
+ final PunctuationSchedule nextSchedule = new
PunctuationSchedule(value, nextPunctuationTime, interval, punctuator,
cancellable);
cancellable.setSchedule(nextSchedule);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8766e64..70ece60 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -569,16 +569,40 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
* @throws IllegalStateException if the current node is not null
*/
public Cancellable schedule(final long interval, final PunctuationType
type, final Punctuator punctuator) {
+ switch (type) {
+ case STREAM_TIME:
+ // align punctuation to 0L, punctuate as soon as we have data
+ return schedule(0L, interval, type, punctuator);
+ case WALL_CLOCK_TIME:
+ // align punctuation to now, punctuate after interval has
elapsed
+ return schedule(time.milliseconds() + interval, interval,
type, punctuator);
+ default:
+ throw new IllegalArgumentException("Unrecognized
PunctuationType: " + type);
+ }
+ }
+
+ /**
+ * Schedules a punctuation for the processor
+ *
+ * @param startTime time of the first punctuation
+ * @param interval the interval in milliseconds
+ * @param type the punctuation type
+ * @throws IllegalStateException if the current node is not null
+ */
+ Cancellable schedule(final long startTime, final long interval, final
PunctuationType type, final Punctuator punctuator) {
if (processorContext.currentNode() == null) {
throw new IllegalStateException(String.format("%sCurrent node is
null", logPrefix));
}
- final PunctuationSchedule schedule = new
PunctuationSchedule(processorContext.currentNode(), interval, punctuator);
+ final PunctuationSchedule schedule = new
PunctuationSchedule(processorContext.currentNode(), startTime, interval,
punctuator);
switch (type) {
case STREAM_TIME:
+ // STREAM_TIME punctuation is data driven, will first
punctuate as soon as stream-time is known and >= time,
+ // stream-time is known when we have received at least one
record from each input topic
return streamTimePunctuationQueue.schedule(schedule);
case WALL_CLOCK_TIME:
+ // WALL_CLOCK_TIME is driven by the wall clock time, will
first punctuate when now >= time
return systemTimePunctuationQueue.schedule(schedule);
default:
throw new IllegalArgumentException("Unrecognized
PunctuationType: " + type);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 1570c9b..09c7a0a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -40,7 +40,7 @@ public class PunctuationQueueTest {
}
};
- final PunctuationSchedule sched = new PunctuationSchedule(node, 100L,
punctuator);
+ final PunctuationSchedule sched = new PunctuationSchedule(node, 0L,
100L, punctuator);
final long now = sched.timestamp - 100L;
queue.schedule(sched);
@@ -66,6 +66,64 @@ public class PunctuationQueueTest {
queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
assertEquals(2, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(4, processor.punctuatedAt.size());
+ }
+
+ @Test
+ public void testPunctuationIntervalCustomAlignment() {
+ final TestProcessor processor = new TestProcessor();
+ final ProcessorNode<String, String> node = new ProcessorNode<>("test",
processor, null);
+ final PunctuationQueue queue = new PunctuationQueue();
+ final Punctuator punctuator = new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ node.processor().punctuate(timestamp);
+ }
+ };
+
+ final PunctuationSchedule sched = new PunctuationSchedule(node, 50L,
100L, punctuator);
+ final long now = sched.timestamp - 50L;
+
+ queue.schedule(sched);
+
+ ProcessorNodePunctuator processorNodePunctuator = new
ProcessorNodePunctuator() {
+ @Override
+ public void punctuate(ProcessorNode node, long time,
PunctuationType type, Punctuator punctuator) {
+ punctuator.punctuate(time);
+ }
+ };
+
+ queue.mayPunctuate(now, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(0, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(0, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(1, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(1, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(2, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(3, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME,
processorNodePunctuator);
+ assertEquals(4, processor.punctuatedAt.size());
}
private static class TestProcessor extends AbstractProcessor<String,
String> {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4bbd0d6..2b3865a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -309,63 +309,169 @@ public class StreamTaskTest {
@Test
public void testMaybePunctuateStreamTime() {
task.addRecords(partition1, records(
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
- new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
- new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue)
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 32, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 60, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue)
));
task.addRecords(partition2, records(
new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
- new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue)
+ new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
+ new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 61, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue)
));
assertTrue(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(5, task.numBuffered());
+ assertEquals(8, task.numBuffered());
assertEquals(1, source1.numReceived);
assertEquals(0, source2.numReceived);
+ assertTrue(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(7, task.numBuffered());
+ assertEquals(2, source1.numReceived);
+ assertEquals(0, source2.numReceived);
+
assertFalse(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(4, task.numBuffered());
- assertEquals(1, source1.numReceived);
+ assertEquals(6, task.numBuffered());
+ assertEquals(2, source1.numReceived);
assertEquals(1, source2.numReceived);
assertTrue(task.maybePunctuateStreamTime());
assertTrue(task.process());
+ assertEquals(5, task.numBuffered());
+ assertEquals(3, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(4, task.numBuffered());
+ assertEquals(3, source1.numReceived);
+ assertEquals(2, source2.numReceived);
+
+ assertTrue(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
assertEquals(3, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(2, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(2, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(3, source2.numReceived);
+
+ assertTrue(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(1, task.numBuffered());
+ assertEquals(5, source1.numReceived);
+ assertEquals(3, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(0, task.numBuffered());
+ assertEquals(5, source1.numReceived);
+ assertEquals(4, source2.numReceived);
+
+ assertFalse(task.process());
+ assertFalse(task.maybePunctuateStreamTime());
+
+
processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
0L, 20L, 32L, 40L, 60L);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldPunctuateOnceStreamTimeAfterGap() {
+ task.addRecords(partition1, records(
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 142, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 155, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 160, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
recordKey, recordValue)
+ ));
+
+ task.addRecords(partition2, records(
+ new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
+ new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 145, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 159, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 161, 0L, TimestampType.CREATE_TIME, 0L, 0, 0,
recordKey, recordValue)
+ ));
+
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+
+ assertTrue(task.process());
+ assertEquals(7, task.numBuffered());
+ assertEquals(1, source1.numReceived);
+ assertEquals(0, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(6, task.numBuffered());
+ assertEquals(1, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
+
+ // only one punctuation after 100ms gap
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(5, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(1, source2.numReceived);
assertFalse(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(2, task.numBuffered());
+ assertEquals(4, task.numBuffered());
assertEquals(2, source1.numReceived);
assertEquals(2, source2.numReceived);
- assertTrue(task.maybePunctuateStreamTime());
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
assertTrue(task.process());
- assertEquals(1, task.numBuffered());
+ assertEquals(3, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(2, source2.numReceived);
assertFalse(task.maybePunctuateStreamTime());
assertTrue(task.process());
- assertEquals(0, task.numBuffered());
+ assertEquals(2, task.numBuffered());
assertEquals(3, source1.numReceived);
assertEquals(3, source2.numReceived);
+ assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160,
still aligned on the initial punctuation
+
+ assertTrue(task.process());
+ assertEquals(1, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(3, source2.numReceived);
+
+ assertFalse(task.maybePunctuateStreamTime());
+
+ assertTrue(task.process());
+ assertEquals(0, task.numBuffered());
+ assertEquals(4, source1.numReceived);
+ assertEquals(4, source2.numReceived);
+
assertFalse(task.process());
assertFalse(task.maybePunctuateStreamTime());
-
processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
20L, 30L, 40L);
+
processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
20L, 142L, 155L, 160L);
}
@SuppressWarnings("unchecked")
@@ -405,18 +511,45 @@ public class StreamTaskTest {
assertTrue(task.maybePunctuateSystemTime());
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
- time.sleep(10);
+ time.sleep(9);
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(1);
assertTrue(task.maybePunctuateSystemTime());
-
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10, now + 20, now + 30);
+ time.sleep(20);
+ assertTrue(task.maybePunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10, now + 20, now + 30, now + 50);
}
@Test
public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
- long now = time.milliseconds();
- assertTrue(task.maybePunctuateSystemTime()); // first time we always
punctuate
+ assertFalse(task.maybePunctuateSystemTime());
time.sleep(9);
assertFalse(task.maybePunctuateSystemTime());
-
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now);
+
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME);
+ }
+
+ @Test
+ public void shouldPunctuateOnceSystemTimeAfterGap() {
+ long now = time.milliseconds();
+ time.sleep(100);
+ assertTrue(task.maybePunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(10);
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(12);
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(7);
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(1); // punctuate at now + 130
+ assertTrue(task.maybePunctuateSystemTime());
+ time.sleep(105); // punctuate at now + 235
+ assertTrue(task.maybePunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+ time.sleep(5); // punctuate at now + 240, still aligned on the initial
punctuation
+ assertTrue(task.maybePunctuateSystemTime());
+ assertFalse(task.maybePunctuateSystemTime());
+
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 100, now + 110, now + 122, now + 130, now + 235, now + 240);
}
@Test
--
To stop receiving notification emails like this one, please contact
[email protected].