[FLINK-5652] [asyncIO] Cancel timers when completing a StreamRecordQueueEntry

Whenever a StreamRecordQueueEntry has been completed we no longer need the 
registered timeout.
Therefore, we have to cancel the corresponding ScheduledFuture so that the 
system knows that
it can remove the associated TriggerTask. This is important since the 
TriggerTask contains a
reference on the StreamRecordQueueEntry. Consequently, such a task will prevent 
the
StreamRecordQueueEntry from being garbage collected.

This closes #3264.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/215776b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/215776b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/215776b8

Branch: refs/heads/master
Commit: 215776b81a52cd380e8ccabd65da612f77da25e6
Parents: 43d2fd2
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Feb 3 16:02:55 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sun Feb 5 21:57:01 2017 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java  | 13 ++-
 .../operators/async/AsyncWaitOperatorTest.java  | 86 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/215776b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 6793620..a70d825 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -50,6 +51,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -203,7 +205,7 @@ public class AsyncWaitOperator<IN, OUT>
                        // register a timeout for this 
AsyncStreamRecordBufferEntry
                        long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
 
-                       getProcessingTimeService().registerTimer(
+                       final ScheduledFuture<?> timerFuture = 
getProcessingTimeService().registerTimer(
                                timeoutTimestamp,
                                new ProcessingTimeCallback() {
                                        @Override
@@ -212,6 +214,15 @@ public class AsyncWaitOperator<IN, OUT>
                                                        new 
TimeoutException("Async function call has timed out."));
                                        }
                                });
+
+                       // Cancel the timer once we've completed the stream 
record buffer entry. This will remove
+                       // the register trigger task
+                       streamRecordBufferEntry.onComplete(new 
AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
+                               @Override
+                               public void 
accept(StreamElementQueueEntry<Collection<OUT>> value) {
+                                       timerFuture.cancel(true);
+                               }
+                       }, executor);
                }
 
                addAsyncBufferEntry(streamRecordBufferEntry);

http://git-wip-us.apache.org/repos/asf/flink/blob/215776b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 4558e06..c2b0803 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -49,10 +49,13 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
 import 
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
+import 
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -76,12 +79,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -801,4 +807,84 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        super.close();
                }
        }
+
+       /**
+        * FLINK-5652
+        * Tests that registered timers are properly canceled upon completion 
of a
+        * {@link StreamRecordQueueEntry} in order to avoid resource leaks 
because TriggerTasks hold
+        * a reference on the StreamRecordQueueEntry.
+        */
+       @Test
+       public void testTimeoutCleanup() throws Exception {
+               final Object lock = new Object();
+
+               final long timeout = 100000L;
+               final long timestamp = 1L;
+
+               Environment environment = mock(Environment.class);
+               when(environment.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
+               when(environment.getTaskManagerInfo()).thenReturn(new 
TestingTaskManagerRuntimeInfo());
+               
when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
+               when(environment.getTaskInfo()).thenReturn(new TaskInfo(
+                       "testTask",
+                       1,
+                       0,
+                       1,
+                       0));
+
+               ScheduledFuture<?> scheduledFuture = 
mock(ScheduledFuture.class);
+
+               ProcessingTimeService processingTimeService = 
mock(ProcessingTimeService.class);
+               
when(processingTimeService.getCurrentProcessingTime()).thenReturn(timestamp);
+               
doReturn(scheduledFuture).when(processingTimeService).registerTimer(anyLong(), 
any(ProcessingTimeCallback.class));
+
+               StreamTask<?, ?> containingTask = mock(StreamTask.class);
+               when(containingTask.getEnvironment()).thenReturn(environment);
+               when(containingTask.getCheckpointLock()).thenReturn(lock);
+               
when(containingTask.getProcessingTimeService()).thenReturn(processingTimeService);
+
+               StreamConfig streamConfig = mock(StreamConfig.class);
+               
doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));
+
+               Output<StreamRecord<Integer>> output = mock(Output.class);
+
+               AsyncWaitOperator<Integer, Integer> operator = new 
AsyncWaitOperator<>(
+                       new AsyncFunction<Integer, Integer>() {
+                               private static final long serialVersionUID = 
-3718276118074877073L;
+
+                               @Override
+                               public void asyncInvoke(Integer input, 
AsyncCollector<Integer> collector) throws Exception {
+                                       
collector.collect(Collections.singletonList(input));
+                               }
+                       },
+                       timeout,
+                       1,
+                       AsyncDataStream.OutputMode.UNORDERED);
+
+               operator.setup(
+                       containingTask,
+                       streamConfig,
+                       output);
+
+               operator.open();
+
+               final StreamRecord<Integer> streamRecord = new 
StreamRecord<>(42, timestamp);
+
+               synchronized (lock) {
+                       // processing an element will register a timeout
+                       operator.processElement(streamRecord);
+               }
+
+               synchronized (lock) {
+                       // closing the operator waits until all inputs have 
been processed
+                       operator.close();
+               }
+
+               // check that we actually outputted the result of the single 
input
+               verify(output).collect(eq(streamRecord));
+               
verify(processingTimeService).registerTimer(eq(processingTimeService.getCurrentProcessingTime()
 + timeout), any(ProcessingTimeCallback.class));
+
+               // check that we have cancelled our registered timeout
+               verify(scheduledFuture).cancel(eq(true));
+       }
 }

Reply via email to