On Thu, Nov 7, 2013 at 1:20 PM, Klaus Aehlig <[email protected]> wrote: > Add utility functions to allocate new job ids by increasing > the value stored in the serial file. As this function is > used in a multi-threaded program, synchronize access over > an internal lock. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > src/Ganeti/JQueue.hs | 42 ++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 42 insertions(+) > > diff --git a/src/Ganeti/JQueue.hs b/src/Ganeti/JQueue.hs > index 61731d5..6f99e26 100644 > --- a/src/Ganeti/JQueue.hs > +++ b/src/Ganeti/JQueue.hs > @@ -46,8 +46,11 @@ module Ganeti.JQueue > , loadJobFromDisk > , noSuchJob > , readSerialFromDisk > + , allocateJobIds > + , allocateJobId > ) where > > +import Control.Concurrent.MVar > import Control.Exception > import Control.Monad > import Data.List > @@ -66,8 +69,10 @@ import Ganeti.BasicTypes > import qualified Ganeti.Constants as C > import Ganeti.JSON > import Ganeti.Logging > +import Ganeti.Objects (Node) > import Ganeti.OpCodes > import Ganeti.Path > +import Ganeti.Rpc (executeRpcCall, RpcCallJobqueueUpdate(..)) > import Ganeti.THH > import Ganeti.Types > import Ganeti.Utils > @@ -338,3 +343,40 @@ readSerialFromDisk = do > filename <- jobQueueSerialFile > tryAndLogIOError (readFile filename) "Failed to read serial file" > (makeJobIdS . rStripSpace) > + > +-- | Allocate new job ids. > +-- To avoid races while accessing the serial file, the threads synchronize > +-- over a lock, as usual provided by an MVar. > +allocateJobIds :: [Node] -> MVar () -> Int -> IO (Result [JobId]) > +allocateJobIds mastercandidates lock n = > + if n <= 0 > + then return . Bad $ "Can only acllocate positive number of job ids"
s/acllocate/allocate/ > + else do > + takeMVar lock > + rjobid <- readSerialFromDisk > + case rjobid of > + Bad s -> do > + putMVar lock () > + return . Bad $ s > + Ok jid -> do > + let current = fromJobId jid > + serial_content = show (current + n) ++ "\n" > + serial <- jobQueueSerialFile > + write_result <- try $ atomicWriteFile serial serial_content > + :: IO (Either IOError ()) > + putMVar lock () > + case write_result of > + Left e -> do > + let msg = "Failed to write serial file: " ++ show e > + logError msg > + return . Bad $ msg > + Right () -> do > + _ <- executeRpcCall mastercandidates > + $ RpcCallJobqueueUpdate serial serial_content > + return $ mapM makeJobId [(current+1)..(current+n)] > + > +-- | Allocate one new job id. > +allocateJobId :: [Node] -> MVar () -> IO (Result JobId) > +allocateJobId mastercandidates lock = do > + jids <- allocateJobIds mastercandidates lock 1 > + return (jids >>= resultThe "Failed to acllocate precisely one Job ID") s/acllocate/allocate/ > -- > 1.8.4.1 > LGTM, including the interdiff sent in the other mail. Thanks, Michele -- Google Germany GmbH Dienerstr. 12 80331 München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores
