[
https://issues.apache.org/jira/browse/FLINK-4496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15510266#comment-15510266
]
ASF GitHub Bot commented on FLINK-4496:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/2434#discussion_r79858930
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import
org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TimeProviderTest {
+
+ @Test
+ public void testDefaultTimeProvider() throws InterruptedException {
+ final Object lock = new Object();
+ TimeServiceProvider timeServiceProvider =
DefaultTimeServiceProvider
+
.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+
+ final List<Long> timestamps = new ArrayList<>();
+
+ long start = System.currentTimeMillis();
+ long interval = 50L;
+
+ long noOfTimers = 5;
+ for (int i = 0; i < noOfTimers; i++) {
+ double nextTimer = start + i * interval;
+
+ timeServiceProvider.registerTimer((long) nextTimer, new
Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws
Exception {
+ timestamps.add(timestamp);
+ }
+ });
+
+ // add also out-of-order tasks to verify that eventually
+ // they will be executed in the correct order.
+
+ if (i > 0) {
+ timeServiceProvider.registerTimer((long)
(nextTimer - 10), new Triggerable() {
+ @Override
+ public void trigger(long timestamp)
throws Exception {
+ timestamps.add(timestamp);
+ }
+ });
+ }
+ }
+
+ Thread.sleep(1000);
--- End diff --
Having a `Thread.sleep()` here is probably problematic when running on
Travis: it might happen that not all timers fire within 1 second. Also, it
always adds one second to the runtime of the test.
I think you can do the verification of correct firing order directly in the
`trigger()` methods. You have an atomic variable outside of the scope of the
timers that you check and update within the trigger methods. Outside, in the
test you can use a `OneShotLatch` to wait on success. Once you detect success
inside the trigger methods you signal that using `OneShotLatch.trigger()`.
> Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
> ----------------------------------------------------------------------------
>
> Key: FLINK-4496
> URL: https://issues.apache.org/jira/browse/FLINK-4496
> Project: Flink
> Issue Type: Sub-task
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)