[FLINK-5652] [asyncIO] Cancel timers when completing a StreamRecordQueueEntry
Whenever a StreamRecordQueueEntry has been completed we no longer need the registered timeout. Therefore, we have to cancel the corresponding ScheduledFuture so that the system knows that it can remove the associated TriggerTask. This is important since the TriggerTask contains a reference on the StreamRecordQueueEntry. Consequently, such a task will prevent the StreamRecordQueueEntry from being garbage collected. This closes #3264. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/215776b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/215776b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/215776b8 Branch: refs/heads/master Commit: 215776b81a52cd380e8ccabd65da612f77da25e6 Parents: 43d2fd2 Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Feb 3 16:02:55 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sun Feb 5 21:57:01 2017 +0100 ---------------------------------------------------------------------- .../api/operators/async/AsyncWaitOperator.java | 13 ++- .../operators/async/AsyncWaitOperatorTest.java | 86 ++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/215776b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index 6793620..a70d825 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -50,6 +51,7 @@ import org.apache.flink.util.Preconditions; import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -203,7 +205,7 @@ public class AsyncWaitOperator<IN, OUT> // register a timeout for this AsyncStreamRecordBufferEntry long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime(); - getProcessingTimeService().registerTimer( + final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer( timeoutTimestamp, new ProcessingTimeCallback() { @Override @@ -212,6 +214,15 @@ public class AsyncWaitOperator<IN, OUT> new TimeoutException("Async function call has timed out.")); } }); + + // Cancel the timer once we've completed the stream record buffer entry. This will remove + // the register trigger task + streamRecordBufferEntry.onComplete(new AcceptFunction<StreamElementQueueEntry<Collection<OUT>>>() { + @Override + public void accept(StreamElementQueueEntry<Collection<OUT>> value) { + timerFuture.cancel(true); + } + }, executor); } addAsyncBufferEntry(streamRecordBufferEntry); http://git-wip-us.apache.org/repos/asf/flink/blob/215776b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 4558e06..c2b0803 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -49,10 +49,13 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue; import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry; +import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -76,12 +79,15 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -801,4 +807,84 @@ public class AsyncWaitOperatorTest extends TestLogger { super.close(); } } + + /** + * FLINK-5652 + * Tests that registered timers are properly canceled upon completion of a + * {@link StreamRecordQueueEntry} in order to avoid resource leaks because TriggerTasks hold + * a reference on the StreamRecordQueueEntry. + */ + @Test + public void testTimeoutCleanup() throws Exception { + final Object lock = new Object(); + + final long timeout = 100000L; + final long timestamp = 1L; + + Environment environment = mock(Environment.class); + when(environment.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); + when(environment.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo()); + when(environment.getUserClassLoader()).thenReturn(getClass().getClassLoader()); + when(environment.getTaskInfo()).thenReturn(new TaskInfo( + "testTask", + 1, + 0, + 1, + 0)); + + ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class); + + ProcessingTimeService processingTimeService = mock(ProcessingTimeService.class); + when(processingTimeService.getCurrentProcessingTime()).thenReturn(timestamp); + doReturn(scheduledFuture).when(processingTimeService).registerTimer(anyLong(), any(ProcessingTimeCallback.class)); + + StreamTask<?, ?> containingTask = mock(StreamTask.class); + when(containingTask.getEnvironment()).thenReturn(environment); + when(containingTask.getCheckpointLock()).thenReturn(lock); + when(containingTask.getProcessingTimeService()).thenReturn(processingTimeService); + + StreamConfig streamConfig = mock(StreamConfig.class); + doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class)); + + Output<StreamRecord<Integer>> output = mock(Output.class); + + AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>( + new AsyncFunction<Integer, Integer>() { + private static final long serialVersionUID = -3718276118074877073L; + + @Override + public void asyncInvoke(Integer input, AsyncCollector<Integer> collector) throws Exception { + collector.collect(Collections.singletonList(input)); + } + }, + timeout, + 1, + AsyncDataStream.OutputMode.UNORDERED); + + operator.setup( + containingTask, + streamConfig, + output); + + operator.open(); + + final StreamRecord<Integer> streamRecord = new StreamRecord<>(42, timestamp); + + synchronized (lock) { + // processing an element will register a timeout + operator.processElement(streamRecord); + } + + synchronized (lock) { + // closing the operator waits until all inputs have been processed + operator.close(); + } + + // check that we actually outputted the result of the single input + verify(output).collect(eq(streamRecord)); + verify(processingTimeService).registerTimer(eq(processingTimeService.getCurrentProcessingTime() + timeout), any(ProcessingTimeCallback.class)); + + // check that we have cancelled our registered timeout + verify(scheduledFuture).cancel(eq(true)); + } }