Repository: flink
Updated Branches:
  refs/heads/release-1.2 28eea24e4 -> 36c7de1ae


[FLINK-5652] [asyncIO] Cancel timers when completing a StreamRecordQueueEntry

Whenever a StreamRecordQueueEntry has been completed we no longer need the 
registered timeout.
Therefore, we have to cancel the corresponding ScheduledFuture so that the 
system knows that
it can remove the associated TriggerTask. This is important since the 
TriggerTask contains a
reference on the StreamRecordQueueEntry. Consequently, such a task will prevent 
the
StreamRecordQueueEntry from being garbage collected.

This closes #3264.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36c7de1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36c7de1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36c7de1a

Branch: refs/heads/release-1.2
Commit: 36c7de1aef7b349b9d66c9d92398f50ebec9d186
Parents: 28eea24
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Feb 3 16:02:55 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sun Feb 5 21:59:18 2017 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java  | 13 ++-
 .../operators/async/AsyncWaitOperatorTest.java  | 86 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36c7de1a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 6793620..a70d825 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -50,6 +51,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -203,7 +205,7 @@ public class AsyncWaitOperator<IN, OUT>
                        // register a timeout for this 
AsyncStreamRecordBufferEntry
                        long timeoutTimestamp = timeout + 
getProcessingTimeService().getCurrentProcessingTime();
 
-                       getProcessingTimeService().registerTimer(
+                       final ScheduledFuture<?> timerFuture = 
getProcessingTimeService().registerTimer(
                                timeoutTimestamp,
                                new ProcessingTimeCallback() {
                                        @Override
@@ -212,6 +214,15 @@ public class AsyncWaitOperator<IN, OUT>
                                                        new 
TimeoutException("Async function call has timed out."));
                                        }
                                });
+
+                       // Cancel the timer once we've completed the stream 
record buffer entry. This will remove
+                       // the register trigger task
+                       streamRecordBufferEntry.onComplete(new 
AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() {
+                               @Override
+                               public void 
accept(StreamElementQueueEntry<Collection<OUT>> value) {
+                                       timerFuture.cancel(true);
+                               }
+                       }, executor);
                }
 
                addAsyncBufferEntry(streamRecordBufferEntry);

http://git-wip-us.apache.org/repos/asf/flink/blob/36c7de1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 34a4c56..15715da 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -48,10 +48,13 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
 import 
org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
+import 
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -75,12 +78,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -805,4 +811,84 @@ public class AsyncWaitOperatorTest extends TestLogger {
                        super.close();
                }
        }
+
+       /**
+        * FLINK-5652
+        * Tests that registered timers are properly canceled upon completion 
of a
+        * {@link StreamRecordQueueEntry} in order to avoid resource leaks 
because TriggerTasks hold
+        * a reference on the StreamRecordQueueEntry.
+        */
+       @Test
+       public void testTimeoutCleanup() throws Exception {
+               final Object lock = new Object();
+
+               final long timeout = 100000L;
+               final long timestamp = 1L;
+
+               Environment environment = mock(Environment.class);
+               when(environment.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
+               when(environment.getTaskManagerInfo()).thenReturn(new 
TestingTaskManagerRuntimeInfo());
+               
when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader());
+               when(environment.getTaskInfo()).thenReturn(new TaskInfo(
+                       "testTask",
+                       1,
+                       0,
+                       1,
+                       0));
+
+               ScheduledFuture<?> scheduledFuture = 
mock(ScheduledFuture.class);
+
+               ProcessingTimeService processingTimeService = 
mock(ProcessingTimeService.class);
+               
when(processingTimeService.getCurrentProcessingTime()).thenReturn(timestamp);
+               
doReturn(scheduledFuture).when(processingTimeService).registerTimer(anyLong(), 
any(ProcessingTimeCallback.class));
+
+               StreamTask<?, ?> containingTask = mock(StreamTask.class);
+               when(containingTask.getEnvironment()).thenReturn(environment);
+               when(containingTask.getCheckpointLock()).thenReturn(lock);
+               
when(containingTask.getProcessingTimeService()).thenReturn(processingTimeService);
+
+               StreamConfig streamConfig = mock(StreamConfig.class);
+               
doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class));
+
+               Output<StreamRecord<Integer>> output = mock(Output.class);
+
+               AsyncWaitOperator<Integer, Integer> operator = new 
AsyncWaitOperator<>(
+                       new AsyncFunction<Integer, Integer>() {
+                               private static final long serialVersionUID = 
-3718276118074877073L;
+
+                               @Override
+                               public void asyncInvoke(Integer input, 
AsyncCollector<Integer> collector) throws Exception {
+                                       
collector.collect(Collections.singletonList(input));
+                               }
+                       },
+                       timeout,
+                       1,
+                       AsyncDataStream.OutputMode.UNORDERED);
+
+               operator.setup(
+                       containingTask,
+                       streamConfig,
+                       output);
+
+               operator.open();
+
+               final StreamRecord<Integer> streamRecord = new 
StreamRecord<>(42, timestamp);
+
+               synchronized (lock) {
+                       // processing an element will register a timeout
+                       operator.processElement(streamRecord);
+               }
+
+               synchronized (lock) {
+                       // closing the operator waits until all inputs have 
been processed
+                       operator.close();
+               }
+
+               // check that we actually outputted the result of the single 
input
+               verify(output).collect(eq(streamRecord));
+               
verify(processingTimeService).registerTimer(eq(processingTimeService.getCurrentProcessingTime()
 + timeout), any(ProcessingTimeCallback.class));
+
+               // check that we have cancelled our registered timeout
+               verify(scheduledFuture).cancel(eq(true));
+       }
 }

Reply via email to