SAMZA-506; gracefully shutdown container on SIGTERM
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b82d4587 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b82d4587 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b82d4587 Branch: refs/heads/samza-sql Commit: b82d458718e43a4a7a580f5a8e65ee7969b0e0f0 Parents: ff40b12 Author: Ewen Cheslack-Postava <[email protected]> Authored: Wed Mar 11 09:06:53 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Mar 11 09:06:53 2015 -0700 ---------------------------------------------------------------------- .../org/apache/samza/container/RunLoop.scala | 31 +++++++++++++++--- .../apache/samza/container/TestRunLoop.scala | 34 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b82d4587/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 499f5c6..4098235 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -45,7 +45,7 @@ class RunLoop( private var lastCommitMs = 0L private var taskShutdownRequests: Set[TaskName] = Set() private var taskCommitRequests: Set[TaskName] = Set() - private var shutdownNow = false + @volatile private var shutdownNow = false // Messages come from the chooser with no connection to the TaskInstance they're bound for. // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them. @@ -56,15 +56,36 @@ class RunLoop( taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap } + val shutdownHook = new Thread() { + override def run() = { + info("Triggering shutdown in response to shutdown hook") + shutdownNow = true + } + } + + protected def addShutdownHook() { + Runtime.getRuntime().addShutdownHook(shutdownHook) + } + + protected def removeShutdownHook() { + Runtime.getRuntime().removeShutdownHook(shutdownHook) + } + /** * Starts the run loop. Blocks until either the tasks request shutdown, or an * unhandled exception is thrown. */ def run { - while (!shutdownNow) { - process - window - commit + try { + addShutdownHook() + + while (!shutdownNow) { + process + window + commit + } + } finally { + removeShutdownHook() } } http://git-wip-us.apache.org/repos/asf/samza/blob/b82d4587/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index ea48853..fb1ebdd 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -211,4 +211,38 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche testMetrics.processMs.getSnapshot.getSize should equal(2) testMetrics.commitMs.getSnapshot.getSize should equal(2) } + + @Test + def testShutdownHook: Unit = { + // The shutdown hook can't be directly tested so we verify that a) both add and remove + // are called and b) invoking the shutdown hook actually kills the run loop. + val consumers = mock[SystemConsumers] + when(consumers.choose).thenReturn(envelope0) + val testMetrics = new SamzaContainerMetrics + var addCalled = false + var removeCalled = false + val runLoop = new RunLoop( + taskInstances = getMockTaskInstances, + consumerMultiplexer = consumers, + metrics = testMetrics) { + override def addShutdownHook() { + addCalled = true + } + override def removeShutdownHook() { + removeCalled = true + } + } + + val runThread = new Thread(runLoop) + runThread.start() + + runLoop.shutdownHook.start() + runLoop.shutdownHook.join(1000) + runThread.join(1000) + + assert(addCalled) + assert(removeCalled) + assert(!runLoop.shutdownHook.isAlive) + assert(!runThread.isAlive) + } }
