LGTM, thanks. Just to understand the picture, what happens if an asynchronous RPC call (replicateFn) fails for some reason?
On Thu, Jan 23, 2014 at 11:01 PM, Klaus Aehlig <[email protected]> wrote: > Provide a function that walks through a list of job ids and > archives them if appropriate. Abort that process if a given > timeout is reached. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > src/Ganeti/JQueue.hs | 81 > ++++++++++++++++++++++++++++++++++++++++++++++++++-- > 1 file changed, 79 insertions(+), 2 deletions(-) > > diff --git a/src/Ganeti/JQueue.hs b/src/Ganeti/JQueue.hs > index c706f1c..a885d01 100644 > --- a/src/Ganeti/JQueue.hs > +++ b/src/Ganeti/JQueue.hs > @@ -61,10 +61,12 @@ module Ganeti.JQueue > , isQueueOpen > , startJobs > , cancelJob > + , archiveJobs > ) where > > import Control.Applicative (liftA2, (<|>)) > import Control.Arrow (first, second) > +import Control.Concurrent (forkIO) > import Control.Concurrent.MVar > import Control.Exception > import Control.Monad > @@ -83,16 +85,17 @@ import qualified Text.JSON > import Text.JSON.Types > > import Ganeti.BasicTypes > +import qualified Ganeti.Config as Config > import qualified Ganeti.Constants as C > import Ganeti.Errors (ErrorResult) > import Ganeti.JSON > import Ganeti.Logging > import Ganeti.Luxi > -import Ganeti.Objects (Node) > +import Ganeti.Objects (ConfigData, Node) > import Ganeti.OpCodes > import Ganeti.Path > import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors, > - RpcCallJobqueueUpdate(..)) > + RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..)) > import Ganeti.THH > import Ganeti.Types > import Ganeti.Utils > @@ -496,3 +499,77 @@ cancelJob jid = do > socketpath <- defaultMasterSocket > client <- getLuxiClient socketpath > callMethod (CancelJob jid) client > + > +-- | Try, at most until the given endtime, to archive some of the given > +-- jobs, if they are older than the specified cut-off time; also replicate > +-- archival of the additional jobs. Return the pair of the number of jobs > +-- archived, and the number of jobs remaining int he queue, asuming the > +-- given numbers about the not considered jobs. > +archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function > + -> FilePath -- ^ queue root directory > + -> ClockTime -- ^ Endtime > + -> Timestamp -- ^ cut-off time for archiving jobs > + -> Int -- ^ number of jobs alread archived > + -> [JobId] -- ^ Additional jobs to replicate > + -> [JobId] -- ^ List of job-ids still to consider > + -> IO (Int, Int) > +archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do > + unless (null torepl) . (>> return ()) > + . forkIO $ replicateFn torepl > + return (arch, 0) > + > +archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) = > do > + let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt > + continue = archiveMore arch torepl jids > + jidname = show $ fromJobId jid > + time <- getClockTime > + if time >= endt > + then do > + _ <- forkIO $ replicateFn torepl > + return (arch, length (jid:jids)) > + else do > + logDebug $ "Inspecting job " ++ jidname ++ " for archival" > + loadResult <- loadJobFromDisk qDir False jid > + case loadResult of > + Bad _ -> continue > + Ok (job, _) -> > + if jobArchivable cutt job > + then do > + let live = liveJobFile qDir jid > + archive = archivedJobFile qDir jid > + renameResult <- safeRenameFile live archive > + case renameResult of > + Bad s -> do > + logWarning $ "Renaming " ++ live ++ " to " ++ archive > + ++ " failed unexpectedly: " ++ s > + continue > + Ok () -> do > + let torepl' = jid:torepl > + if length torepl' >= 10 > + then do > + _ <- forkIO $ replicateFn torepl' > + archiveMore (arch + 1) [] jids > + else archiveMore (arch + 1) torepl' jids > + else continue > + > +-- | Archive jobs older than the given time, but do not exceed the > timeout for > +-- carrying out this task. > +archiveJobs :: ConfigData -- ^ cluster configuration > + -> Int -- ^ time the job has to be in the past in order > + -- to be archived > + -> Int -- ^ timeout > + -> [JobId] -- ^ jobs to consider > + -> IO (Int, Int) > +archiveJobs cfg age timeout jids = do > + now <- getClockTime > + qDir <- queueDir > + let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now > + cuttime = if age < 0 then noTimestamp > + else advanceTimestamp (- age) (fromClockTime > now) > + mcs = Config.getMasterCandidates cfg > + replicateFn jobs = do > + let olds = map (liveJobFile qDir) jobs > + news = map (archivedJobFile qDir) jobs > + _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news > + return () > + archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids > -- > 1.8.5.3 > >
