On Thu, Nov 7, 2013 at 1:20 PM, Klaus Aehlig <[email protected]> wrote:
> As luxid is to take over responisibility for the job queue,

s/responisibility/responsibility/

> handle this request by writing the job to the queue and then
> informing masterd; masterd will also distribute the job to
> all master candidates.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  src/Ganeti/Query/Server.hs | 91 
> ++++++++++++++++++++++++++++++++--------------
>  1 file changed, 63 insertions(+), 28 deletions(-)
>
> diff --git a/src/Ganeti/Query/Server.hs b/src/Ganeti/Query/Server.hs
> index e28c6aa..f4c5ce7 100644
> --- a/src/Ganeti/Query/Server.hs
> +++ b/src/Ganeti/Query/Server.hs
> @@ -34,7 +34,7 @@ module Ganeti.Query.Server
>  import Control.Applicative
>  import Control.Concurrent
>  import Control.Exception
> -import Control.Monad (forever)
> +import Control.Monad (forever, when)
>  import Data.Bits (bitSize)
>  import qualified Data.Set as Set (toList)
>  import Data.IORef
> @@ -52,13 +52,16 @@ import Ganeti.Objects
>  import qualified Ganeti.Config as Config
>  import Ganeti.ConfigReader
>  import Ganeti.BasicTypes
> +import Ganeti.JQueue
>  import Ganeti.Logging
>  import Ganeti.Luxi
>  import qualified Ganeti.Query.Language as Qlang
>  import qualified Ganeti.Query.Cluster as QCluster
> +import Ganeti.Path (queueDir, jobQueueLockFile, defaultLuxiSocket)
>  import Ganeti.Query.Query
>  import Ganeti.Query.Filter (makeSimpleFilter)
>  import Ganeti.Types
> +import Ganeti.Utils (lockFile, exitIfBad)
>
>  -- | Helper for classic queries.
>  handleClassicQuery :: ConfigData      -- ^ Cluster config
> @@ -76,16 +79,17 @@ handleClassicQuery cfg qkind names fields _ = do
>    return $ showJSON <$> (qr >>= queryCompat)
>
>  -- | Minimal wrapper to handle the missing config case.
> -handleCallWrapper :: Result ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
> -handleCallWrapper (Bad msg) _ =
> +handleCallWrapper :: MVar () -> Result ConfigData
> +                     -> LuxiOp -> IO (ErrorResult JSValue)
> +handleCallWrapper _ (Bad msg) _ =
>    return . Bad . ConfigurationError $
>             "I do not have access to a valid configuration, cannot\
>             \ process queries: " ++ msg
> -handleCallWrapper (Ok config) op = handleCall config op
> +handleCallWrapper qlock (Ok config) op = handleCall qlock config op
>
>  -- | Actual luxi operation handler.
> -handleCall :: ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
> -handleCall cdata QueryClusterInfo =
> +handleCall :: MVar () -> ConfigData -> LuxiOp -> IO (ErrorResult JSValue)
> +handleCall _ cdata QueryClusterInfo =
>    let cluster = configCluster cdata
>        master = QCluster.clusterMasterNodeName cdata
>        hypervisors = clusterEnabledHypervisors cluster
> @@ -154,7 +158,7 @@ handleCall cdata QueryClusterInfo =
>      Ok _ -> return . Ok . J.makeObj $ obj
>      Bad ex -> return $ Bad ex
>
> -handleCall cfg (QueryTags kind name) = do
> +handleCall _ cfg (QueryTags kind name) = do
>    let tags = case kind of
>                 TagKindCluster  -> Ok . clusterTags $ configCluster cfg
>                 TagKindGroup    -> groupTags <$> Config.getGroup    cfg name
> @@ -165,41 +169,68 @@ handleCall cfg (QueryTags kind name) = do
>                                          ECodeInval
>    return (J.showJSON <$> tags)
>
> -handleCall cfg (Query qkind qfields qfilter) = do
> +handleCall _ cfg (Query qkind qfields qfilter) = do
>    result <- query cfg True (Qlang.Query qkind qfields qfilter)
>    return $ J.showJSON <$> result
>
> -handleCall _ (QueryFields qkind qfields) = do
> +handleCall _ _ (QueryFields qkind qfields) = do
>    let result = queryFields (Qlang.QueryFields qkind qfields)
>    return $ J.showJSON <$> result
>
> -handleCall cfg (QueryNodes names fields lock) =
> +handleCall _ cfg (QueryNodes names fields lock) =
>    handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNode)
>      (map Left names) fields lock
>
> -handleCall cfg (QueryGroups names fields lock) =
> +handleCall _ cfg (QueryGroups names fields lock) =
>    handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRGroup)
>      (map Left names) fields lock
>
> -handleCall cfg (QueryJobs names fields) =
> +handleCall _ cfg (QueryJobs names fields) =
>    handleClassicQuery cfg (Qlang.ItemTypeLuxi Qlang.QRJob)
>      (map (Right . fromIntegral . fromJobId) names)  fields False
>
> -handleCall cfg (QueryNetworks names fields lock) =
> +handleCall _ cfg (QueryNetworks names fields lock) =
>    handleClassicQuery cfg (Qlang.ItemTypeOpCode Qlang.QRNetwork)
>      (map Left names) fields lock
>
> -handleCall _ op =
> +handleCall qlock cfg (SubmitJobToDrainedQueue ops) =
> +  do
> +    jobid <- allocateJobId (Config.getMasterCandidates cfg) qlock
> +    case jobid of
> +      Bad s -> return . Bad . GenericError $ s
> +      Ok jid -> do
> +        qDir <- queueDir
> +        job <- queuedJobFromOpCodes jid ops
> +        write_result <- writeJobToDisk qDir job
> +        case write_result of
> +          Bad s -> return . Bad . GenericError $ s
> +          Ok () -> do
> +            socketpath <- defaultLuxiSocket
> +            client <- getClient socketpath
> +            pickupResult <- callMethod (PickupJob jid) client
> +            case pickupResult of
> +              Ok _ -> return ()
> +              Bad e -> logWarning $ "Failded to notify masterd: " ++ show e
> +            return . Ok . showJSON . fromJobId $ jid
> +
> +handleCall qlock cfg (SubmitJob ops) =
> +  do
> +    open <- isQueueOpen
> +    if not open
> +       then return . Bad . GenericError $ "Queue drained"
> +       else handleCall qlock cfg (SubmitJobToDrainedQueue ops)
> +
> +handleCall _ _ op =
>    return . Bad $
>      GenericError ("Luxi call '" ++ strOfOp op ++ "' not implemented")
>
>  -- | Given a decoded luxi request, executes it and sends the luxi
>  -- response back to the client.
> -handleClientMsg :: Client -> ConfigReader -> LuxiOp -> IO Bool
> -handleClientMsg client creader args = do
> +handleClientMsg :: MVar () -> Client -> ConfigReader -> LuxiOp -> IO Bool
> +handleClientMsg qlock client creader args = do
>    cfg <- creader
>    logDebug $ "Request: " ++ show args
> -  call_result <- handleCallWrapper cfg args
> +  call_result <- handleCallWrapper qlock cfg args
>    (!status, !rval) <-
>      case call_result of
>        Bad err -> do
> @@ -216,8 +247,8 @@ handleClientMsg client creader args = do
>
>  -- | Handles one iteration of the client protocol: receives message,
>  -- checks it for validity and decodes it, returns response.
> -handleClient :: Client -> ConfigReader -> IO Bool
> -handleClient client creader = do
> +handleClient :: MVar () -> Client -> ConfigReader -> IO Bool
> +handleClient qlock client creader = do
>    !msg <- recvMsgExt client
>    logDebug $ "Received message: " ++ show msg
>    case msg of
> @@ -231,23 +262,23 @@ handleClient client creader = do
>               logWarning errmsg
>               sendMsg client $ buildResponse False (showJSON errmsg)
>               return False
> -        Ok args -> handleClientMsg client creader args
> +        Ok args -> handleClientMsg qlock client creader args
>
>  -- | Main client loop: runs one loop of 'handleClient', and if that
>  -- doesn't report a finished (closed) connection, restarts itself.
> -clientLoop :: Client -> ConfigReader -> IO ()
> -clientLoop client creader = do
> -  result <- handleClient client creader
> +clientLoop :: MVar () -> Client -> ConfigReader -> IO ()
> +clientLoop qlock client creader = do
> +  result <- handleClient qlock client creader
>    if result
> -    then clientLoop client creader
> +    then clientLoop qlock client creader
>      else closeClient client
>
>  -- | Main listener loop: accepts clients, forks an I/O thread to handle
>  -- that client.
> -listener :: ConfigReader -> S.Socket -> IO ()
> -listener creader socket = do
> +listener :: MVar () -> ConfigReader -> S.Socket -> IO ()
> +listener qlock creader socket = do
>    client <- acceptClient socket
> -  _ <- forkIO $ clientLoop client creader
> +  _ <- forkIO $ clientLoop qlock client creader
>    return ()
>
>  -- | Type alias for prepMain results
> @@ -272,7 +303,11 @@ main :: MainFn () PrepResult
>  main _ _ (socket_path, server, cref) = do
>    initConfigReader id cref
>    let creader = readIORef cref
> +
> +  qlockFile <- jobQueueLockFile
> +  lockFile qlockFile >>= exitIfBad "Failed to obtain the job-queue lock"
> +  qlock <- newMVar ()
>
>    finally
> -    (forever $ listener creader server)
> +    (forever $ listener qlock creader server)
>      (closeServer socket_path server)
> --
> 1.8.4.1
>

Rest LGTM, no need to resend.

Thanks,
Michele

-- 
Google Germany GmbH
Dienerstr. 12
80331 München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores

Reply via email to