When job files are updated repeatedly by a job executor logging messages one at a time, luxi's inotify watcher reloads the job and updates the qRunning list in qstate. Unfortunately this is done lazily, leading to partly parsed copies of all the job files since the job started executing accumulating in memory. This space leak continues until the job finishes running.
This causes problems for cluster verify on clusters with errors which can produce lots of messages each of which updates the job file. This case was partly addressed with commit d929e5b566bc that logs some errors in batches. However, even the simplest job will update the job file more than once, so the leak always has some impact. Sprinkle enough strictness to force the full JSON parse to occur when the queue is updated. For a (sightly contrived) test that sends 150 log messages, this reduces the peak (profiling) heap usage of loadJobFromDisk from 900MB to 26MB, and readJsonWithDesc from 81MB to 13MB, and makes the heap profile approximately flat rather than growing linearly until the job finishes. Signed-off-by: Brian Foley <[email protected]> --- src/Ganeti/JQScheduler.hs | 16 +++++++++++++--- src/Ganeti/JQScheduler/Types.hs | 9 +++++---- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/Ganeti/JQScheduler.hs b/src/Ganeti/JQScheduler.hs index 66c3fd1..3ba6273 100644 --- a/src/Ganeti/JQScheduler.hs +++ b/src/Ganeti/JQScheduler.hs @@ -109,13 +109,23 @@ emptyJQStatus config = do return JQStatus { jqJobs = jqJ, jqConfig = config, jqLivelock = livelock , jqForkLock = forkLock } +-- When updating the job lists, force the elements to WHNF, otherwise it is +-- easy to leak the resources held onto by the lazily parsed job file. +-- This can happen, eg, if updateJob is called, but the resulting QueuedJob +-- isn't used by the scheduler, for example when the inotify watcher or the +-- the polling loop re-reads a job with a new message appended to it. + -- | Apply a function on the running jobs. onRunningJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue -onRunningJobs = over qRunningL +onRunningJobs f q@Queue{qRunning=qr} = + let qr' = (foldr seq () qr) `seq` f qr -- force list els to WHNF + in q{qRunning=qr'} -- | Apply a function on the queued jobs. onQueuedJobs :: ([JobWithStat] -> [JobWithStat]) -> Queue -> Queue -onQueuedJobs = over qEnqueuedL +onQueuedJobs f q@Queue{qEnqueued=qe} = + let qe' = (foldr seq () qe) `seq` f qe -- force list els to WHNF + in q{qEnqueued=qe'} -- | Obtain a JobWithStat from a QueuedJob. unreadJob :: QueuedJob -> JobWithStat @@ -150,7 +160,7 @@ getRQL = liftM (length . qRunning) . readIORef . jqJobs -- | Wrapper function to atomically update the jobs in the queue status. modifyJobs :: JQStatus -> (Queue -> Queue) -> IO () -modifyJobs qstat f = atomicModifyIORef (jqJobs qstat) (flip (,) () . f) +modifyJobs qstat f = atomicModifyIORef' (jqJobs qstat) (flip (,) () . f) -- | Reread a job from disk, if the file has changed. readJobStatus :: JobWithStat -> IO (Maybe JobWithStat) diff --git a/src/Ganeti/JQScheduler/Types.hs b/src/Ganeti/JQScheduler/Types.hs index af24366..2bca743 100644 --- a/src/Ganeti/JQScheduler/Types.hs +++ b/src/Ganeti/JQScheduler/Types.hs @@ -1,4 +1,5 @@ {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE BangPatterns #-} {-| Types for the JQScheduler. -} @@ -43,7 +44,7 @@ import Ganeti.Utils data JobWithStat = JobWithStat { jINotify :: Maybe INotify , jStat :: FStat - , jJob :: QueuedJob + , jJob :: !QueuedJob } deriving (Eq, Show) $(makeCustomLenses' ''JobWithStat ['jJob]) @@ -54,9 +55,9 @@ nullJobWithStat :: QueuedJob -> JobWithStat nullJobWithStat = JobWithStat Nothing nullFStat -data Queue = Queue { qEnqueued :: [JobWithStat] - , qRunning :: [JobWithStat] - , qManipulated :: [JobWithStat] -- ^ running jobs that are +data Queue = Queue { qEnqueued :: ![JobWithStat] + , qRunning :: ![JobWithStat] + , qManipulated :: ![JobWithStat] -- ^ running jobs that are -- being manipulated by -- some thread } deriving (Eq, Show) -- 2.8.0.rc3.226.g39d4020
