LGTM, thanks.

Just to understand the picture, what happens if an asynchronous RPC call
(replicateFn) fails for some reason?


On Thu, Jan 23, 2014 at 11:01 PM, Klaus Aehlig <[email protected]> wrote:

> Provide a function that walks through a list of job ids and
> archives them if appropriate. Abort that process if a given
> timeout is reached.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  src/Ganeti/JQueue.hs | 81
> ++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 79 insertions(+), 2 deletions(-)
>
> diff --git a/src/Ganeti/JQueue.hs b/src/Ganeti/JQueue.hs
> index c706f1c..a885d01 100644
> --- a/src/Ganeti/JQueue.hs
> +++ b/src/Ganeti/JQueue.hs
> @@ -61,10 +61,12 @@ module Ganeti.JQueue
>      , isQueueOpen
>      , startJobs
>      , cancelJob
> +    , archiveJobs
>      ) where
>
>  import Control.Applicative (liftA2, (<|>))
>  import Control.Arrow (first, second)
> +import Control.Concurrent (forkIO)
>  import Control.Concurrent.MVar
>  import Control.Exception
>  import Control.Monad
> @@ -83,16 +85,17 @@ import qualified Text.JSON
>  import Text.JSON.Types
>
>  import Ganeti.BasicTypes
> +import qualified Ganeti.Config as Config
>  import qualified Ganeti.Constants as C
>  import Ganeti.Errors (ErrorResult)
>  import Ganeti.JSON
>  import Ganeti.Logging
>  import Ganeti.Luxi
> -import Ganeti.Objects (Node)
> +import Ganeti.Objects (ConfigData, Node)
>  import Ganeti.OpCodes
>  import Ganeti.Path
>  import Ganeti.Rpc (executeRpcCall, ERpcError, logRpcErrors,
> -                   RpcCallJobqueueUpdate(..))
> +                   RpcCallJobqueueUpdate(..), RpcCallJobqueueRename(..))
>  import Ganeti.THH
>  import Ganeti.Types
>  import Ganeti.Utils
> @@ -496,3 +499,77 @@ cancelJob jid = do
>    socketpath <- defaultMasterSocket
>    client <- getLuxiClient socketpath
>    callMethod (CancelJob jid) client
> +
> +-- | Try, at most until the given endtime, to archive some of the given
> +-- jobs, if they are older than the specified cut-off time; also replicate
> +-- archival of the additional jobs. Return the pair of the number of jobs
> +-- archived, and the number of jobs remaining int he queue, asuming the
> +-- given numbers about the not considered jobs.
> +archiveSomeJobsUntil :: ([JobId] -> IO ()) -- ^ replication function
> +                        -> FilePath -- ^ queue root directory
> +                        -> ClockTime -- ^ Endtime
> +                        -> Timestamp -- ^ cut-off time for archiving jobs
> +                        -> Int -- ^ number of jobs alread archived
> +                        -> [JobId] -- ^ Additional jobs to replicate
> +                        -> [JobId] -- ^ List of job-ids still to consider
> +                        -> IO (Int, Int)
> +archiveSomeJobsUntil replicateFn _ _ _ arch torepl [] = do
> +  unless (null torepl) . (>> return ())
> +   . forkIO $ replicateFn torepl
> +  return (arch, 0)
> +
> +archiveSomeJobsUntil replicateFn qDir endt cutt arch torepl (jid:jids) =
> do
> +  let archiveMore = archiveSomeJobsUntil replicateFn qDir endt cutt
> +      continue = archiveMore arch torepl jids
> +      jidname = show $ fromJobId jid
> +  time <- getClockTime
> +  if time >= endt
> +    then do
> +      _ <- forkIO $ replicateFn torepl
> +      return (arch, length (jid:jids))
> +    else do
> +      logDebug $ "Inspecting job " ++ jidname ++ " for archival"
> +      loadResult <- loadJobFromDisk qDir False jid
> +      case loadResult of
> +        Bad _ -> continue
> +        Ok (job, _) ->
> +          if jobArchivable cutt job
> +            then do
> +              let live = liveJobFile qDir jid
> +                  archive = archivedJobFile qDir jid
> +              renameResult <- safeRenameFile live archive
> +              case renameResult of
> +                Bad s -> do
> +                  logWarning $ "Renaming " ++ live ++ " to " ++ archive
> +                                 ++ " failed unexpectedly: " ++ s
> +                  continue
> +                Ok () -> do
> +                  let torepl' = jid:torepl
> +                  if length torepl' >= 10
> +                    then do
> +                      _ <- forkIO $ replicateFn torepl'
> +                      archiveMore (arch + 1) [] jids
> +                    else archiveMore (arch + 1) torepl' jids
> +            else continue
> +
> +-- | Archive jobs older than the given time, but do not exceed the
> timeout for
> +-- carrying out this task.
> +archiveJobs :: ConfigData -- ^ cluster configuration
> +               -> Int  -- ^ time the job has to be in the past in order
> +                       -- to be archived
> +               -> Int -- ^ timeout
> +               -> [JobId] -- ^ jobs to consider
> +               -> IO (Int, Int)
> +archiveJobs cfg age timeout jids = do
> +  now <- getClockTime
> +  qDir <- queueDir
> +  let endtime = addToClockTime (noTimeDiff { tdSec = timeout }) now
> +      cuttime = if age < 0 then noTimestamp
> +                           else advanceTimestamp (- age) (fromClockTime
> now)
> +      mcs = Config.getMasterCandidates cfg
> +      replicateFn jobs = do
> +        let olds = map (liveJobFile qDir) jobs
> +            news = map (archivedJobFile qDir) jobs
> +        _ <- executeRpcCall mcs . RpcCallJobqueueRename $ zip olds news
> +        return ()
> +  archiveSomeJobsUntil replicateFn qDir endtime cuttime 0 [] jids
> --
> 1.8.5.3
>
>

Reply via email to