On Thu, Nov 7, 2013 at 1:20 PM, Klaus Aehlig <[email protected]> wrote: > As luxid is to take over responisibility for the job queue,
s/responisibility/responsibility/ > handle this request by writing the job to the queue and then > informing masterd; masterd will also distribute the job to > all master candidates. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > src/Ganeti/Query/Server.hs | 91 > ++++++++++++++++++++++++++++++++-------------- > 1 file changed, 63 insertions(+), 28 deletions(-) > > diff --git a/src/Ganeti/Query/Server.hs b/src/Ganeti/Query/Server.hs > index e28c6aa..f4c5ce7 100644 > --- a/src/Ganeti/Query/Server.hs > +++ b/src/Ganeti/Query/Server.hs > @@ -34,7 +34,7 @@ module Ganeti.Query.Server > import Control.Applicative > import Control.Concurrent > import Control.Exception > -import Control.Monad (forever) > +import Control.Monad (forever, when) > import Data.Bits (bitSize) > import qualified Data.Set as Set (toList) > import Data.IORef > @@ -52,13 +52,16 @@ import Ganeti.Objects > import qualified Ganeti.Config as Config > import Ganeti.ConfigReader > import Ganeti.BasicTypes > +import Ganeti.JQueue > import Ganeti.Logging > import Ganeti.Luxi > import qualified Ganeti.Query.Language as Qlang > import qualified Ganeti.Query.Cluster as QCluster > +import Ganeti.Path (queueDir, jobQueueLockFile, defaultLuxiSocket) > import Ganeti.Query.Query > import Ganeti.Query.Filter (makeSimpleFilter) > import Ganeti.Types > +import Ganeti.Utils (lockFile, exitIfBad) > > -- | Helper for classic queries. > handleClassicQuery :: ConfigData -- ^ Cluster config > @@ -76,16 +79,17 @@ handleClassicQuery cfg qkind names fields _ = do > return $ showJSON <$> (qr >>= queryCompat) > > -- | Minimal wrapper to handle the missing config case. > -handleCallWrapper :: Result ConfigData -> LuxiOp -> IO (ErrorResult JSValue) > -handleCallWrapper (Bad msg) _ = > +handleCallWrapper :: MVar () -> Result ConfigData > + -> LuxiOp -> IO (ErrorResult JSValue) > +handleCallWrapper _ (Bad msg) _ = > return . Bad . ConfigurationError $ > "I do not have access to a valid configuration, cannot\ > \ process queries: " ++ msg > -handleCallWrapper (Ok config) op = handleCall config op > +handleCallWrapper qlock (Ok config) op = handleCall qlock config op > > -- | Actual luxi operation handler. > -handleCall :: ConfigData -> LuxiOp -> IO (ErrorResult JSValue) > -handleCall cdata QueryClusterInfo = > +handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue) > +handleCall _ cdata QueryClusterInfo = > let cluster = configCluster cdata > master = QCluster.clusterMasterNodeName cdata > hypervisors = clusterEnabledHypervisors cluster > @@ -154,7 +158,7 @@ handleCall cdata QueryClusterInfo = > Ok _ -> return . Ok . J.makeObj $ obj > Bad ex -> return $ Bad ex > > -handleCall cfg (QueryTags kind name) = do > +handleCall _ cfg (QueryTags kind name) = do > let tags = case kind of > TagKindCluster -> Ok . clusterTags $ configCluster cfg > TagKindGroup -> groupTags <$> Config.getGroup cfg name > @@ -165,41 +169,68 @@ handleCall cfg (QueryTags kind name) = do > ECodeInval > return (J.showJSON <$> tags) > > -handleCall cfg (Query qkind qfields qfilter) = do > +handleCall _ cfg (Query qkind qfields qfilter) = do > result <- query cfg True (Qlang.Query qkind qfields qfilter) > return $ J.showJSON <$> result > > -handleCall _ (QueryFields qkind qfields) = do > +handleCall _ _ (QueryFields qkind qfields) = do > let result = queryFields (Qlang.QueryFields qkind qfields) > return $ J.showJSON <$> result > > -handleCall cfg (QueryNodes names fields lock) = > +handleCall _ cfg (QueryNodes names fields lock) = > handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode) > (map Left names) fields lock > > -handleCall cfg (QueryGroups names fields lock) = > +handleCall _ cfg (QueryGroups names fields lock) = > handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup) > (map Left names) fields lock > > -handleCall cfg (QueryJobs names fields) = > +handleCall _ cfg (QueryJobs names fields) = > handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob) > (map (Right . fromIntegral . fromJobId) names) fields False > > -handleCall cfg (QueryNetworks names fields lock) = > +handleCall _ cfg (QueryNetworks names fields lock) = > handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork) > (map Left names) fields lock > > -handleCall _ op = > +handleCall qlock cfg (SubmitJobToDrainedQueue ops) = > + do > + jobid <- allocateJobId (Config.getMasterCandidates cfg) qlock > + case jobid of > + Bad s -> return . Bad . GenericError $ s > + Ok jid -> do > + qDir <- queueDir > + job <- queuedJobFromOpCodes jid ops > + write_result <- writeJobToDisk qDir job > + case write_result of > + Bad s -> return . Bad . GenericError $ s > + Ok () -> do > + socketpath <- defaultLuxiSocket > + client <- getClient socketpath > + pickupResult <- callMethod (PickupJob jid) client > + case pickupResult of > + Ok _ -> return () > + Bad e -> logWarning $ "Failded to notify masterd: " ++ show e > + return . Ok . showJSON . fromJobId $ jid > + > +handleCall qlock cfg (SubmitJob ops) = > + do > + open <- isQueueOpen > + if not open > + then return . Bad . GenericError $ "Queue drained" > + else handleCall qlock cfg (SubmitJobToDrainedQueue ops) > + > +handleCall _ _ op = > return . Bad $ > GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented") > > -- | Given a decoded luxi request, executes it and sends the luxi > -- response back to the client. > -handleClientMsg :: Client -> ConfigReader -> LuxiOp -> IO Bool > -handleClientMsg client creader args = do > +handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool > +handleClientMsg qlock client creader args = do > cfg <- creader > logDebug $ "Request: " ++ show args > - call_result <- handleCallWrapper cfg args > + call_result <- handleCallWrapper qlock cfg args > (!status, !rval) <- > case call_result of > Bad err -> do > @@ -216,8 +247,8 @@ handleClientMsg client creader args = do > > -- | Handles one iteration of the client protocol: receives message, > -- checks it for validity and decodes it, returns response. > -handleClient :: Client -> ConfigReader -> IO Bool > -handleClient client creader = do > +handleClient :: MVar () -> Client -> ConfigReader -> IO Bool > +handleClient qlock client creader = do > !msg <- recvMsgExt client > logDebug $ "Received message: " ++ show msg > case msg of > @@ -231,23 +262,23 @@ handleClient client creader = do > logWarning errmsg > sendMsg client $ buildResponse False (showJSON errmsg) > return False > - Ok args -> handleClientMsg client creader args > + Ok args -> handleClientMsg qlock client creader args > > -- | Main client loop: runs one loop of 'handleClient', and if that > -- doesn't report a finished (closed) connection, restarts itself. > -clientLoop :: Client -> ConfigReader -> IO () > -clientLoop client creader = do > - result <- handleClient client creader > +clientLoop :: MVar () -> Client -> ConfigReader -> IO () > +clientLoop qlock client creader = do > + result <- handleClient qlock client creader > if result > - then clientLoop client creader > + then clientLoop qlock client creader > else closeClient client > > -- | Main listener loop: accepts clients, forks an I/O thread to handle > -- that client. > -listener :: ConfigReader -> S.Socket -> IO () > -listener creader socket = do > +listener :: MVar () -> ConfigReader -> S.Socket -> IO () > +listener qlock creader socket = do > client <- acceptClient socket > - _ <- forkIO $ clientLoop client creader > + _ <- forkIO $ clientLoop qlock client creader > return () > > -- | Type alias for prepMain results > @@ -272,7 +303,11 @@ main :: MainFn () PrepResult > main _ _ (socket_path, server, cref) = do > initConfigReader id cref > let creader = readIORef cref > + > + qlockFile <- jobQueueLockFile > + lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock" > + qlock <- newMVar () > > finally > - (forever $ listener creader server) > + (forever $ listener qlock creader server) > (closeServer socket_path server) > -- > 1.8.4.1 > Rest LGTM, no need to resend. Thanks, Michele -- Google Germany GmbH Dienerstr. 12 80331 München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores
