On Fri, Nov 20, 2015 at 5:11 PM, 'Oleg Ponomarev' via ganeti-devel <
[email protected]> wrote:

> forkJobProcess implementation consist of several steps. Move each
> logical consistent step into the generalized function in order to reuse
>

Fix up nits pointed out in previous review.


> these code fragments in forkPostHooksProcess (will be the next commit).
>
> Signed-off-by: Oleg Ponomarev <[email protected]>
> ---
>  src/Ganeti/Query/Exec.hs | 184
> +++++++++++++++++++++++++++--------------------
>  1 file changed, 106 insertions(+), 78 deletions(-)
>
> diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs
> index 79889ff..8a4b13f 100644
> --- a/src/Ganeti/Query/Exec.hs
> +++ b/src/Ganeti/Query/Exec.hs
> @@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable
>      filterReadable :: (Read a) => [String] -> [a]
>      filterReadable = mapMaybe (fmap fst . listToMaybe . reads)
>
> -
>  -- | Catches a potential `IOError` and sets its description via
>  -- `annotateIOError`. This makes exceptions more informative when they
>  -- are thrown from an unnamed `Handle`.
> @@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a
>  rethrowAnnotateIOError desc =
>    modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
>
> --- Code that is executed in a @fork@-ed process and that the replaces
> iteself
> --- with the actual job process
> -runJobProcess :: JobId -> Client -> IO ()
> -runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
> +
> +-- | Code that is executed in a @fork@-ed process. Performs
> communication with
> +-- the parent process by calling commFn and then runs pyExecIO python
> +-- executable.
> +runProcess :: JobId -- ^ a job to process
> +           -> Client -- ^ UDS transport
> +           -> IO FilePath -- ^ path to the python executable
> +           -> ((String -> IO ()) -> JobId -> Client -> IO Fd)
> +              -- ^ pre-execution function communicating with the parent.
> The
> +              -- function returns the file descriptor which should be
>

s/be//


> +              -- remain open
> +           -> IO ()
> +runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $
>    do
>      -- Close the standard error to prevent anything being written there
>      -- (for example by exceptions when closing unneeded FDs).
> @@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show
> jid) $
>      let logLater _ = return ()
>
>      logLater $ "Forking a new process for job " ++ show (fromJobId jid)
> -
> -    -- Create a livelock file for the job
> -    (TOD ts _) <- getClockTime
> -    lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts
> -
> -    -- Lock the livelock file
> -    logLater $ "Locking livelock file " ++ show lockfile
> -    fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock
> file"
> -    logLater "Sending the lockfile name to the master process"
> -    sendMsg s lockfile
> -
> -    logLater "Waiting for the master process to confirm the lock"
> -    _ <- recvMsg s
> -
> +    preserve_fd <- commFn logLater jid s
>      -- close the client
>      logLater "Closing the client"
>      (clFdR, clFdW) <- clientToFd s
> @@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show
> jid) $
>      closeFd clFdR
>      closeFd clFdW
>
> -    fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds
> +    fds <- (filter (> 2) . filter (/= preserve_fd)) <$> toErrorBase
> listOpenFds
>      logLater $ "Closing every superfluous file descriptor: " ++ show fds
>      mapM_ (tryIOError . closeFd) fds
>
> -    -- the master process will send the job id and the livelock file name
> -    -- using the same protocol to the job process
> -    -- we pass the job id as the first argument to the process;
> -    -- while the process never uses it, it's very convenient when listing
> -    -- job processes
> +    -- The master process will send the job id and the livelock file name
> +    -- using the same protocol. We pass the job id as the first argument
> +    -- to the process. While the process never uses it, it's very
> convenient
> +    -- when listing job processes.
>      use_debug <- isDebugMode
>      env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
>              . M.insert "PYTHONPATH" AC.versionedsharedir
>              . M.fromList)
>             `liftM` getEnvironment
> -    execPy <- P.jqueueExecutorPy
> +    execPy <- pyExecIO
>      logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
>                 ++ " with PYTHONPATH=" ++ AC.versionedsharedir
>      () <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)]
> @@ -224,6 +218,59 @@ forkWithPipe conf childAction = do
>             $ closeClient child
>    return (pid, master)
>
> +-- | Kill the process with the id provided.
> +killProcessOnError :: (FromString e, Show e)
> +                   => ProcessID -- ^ job process pid
> +                   -> Client -- ^ UDS client connected to the master node
> +                   -> (String -> ResultT e (WriterLogT IO) ())
> +                      -- ^ log function
> +                   -> ResultT e (WriterLogT IO) ()
> +killProcessOnError pid master logFn = do
> +  logFn "Closing the pipe to the client"
> +  withErrorLogAt WARNING "Closing the communication pipe failed"
> +                 (liftIO (closeClient master)) `orElse` return ()
> +  killIfAlive [sigTERM, sigABRT, sigKILL]
> +  where killIfAlive [] = return ()
> +        killIfAlive (sig : sigs) = do
> +          logFn "Getting the status of the process"
> +          status <- tryError . liftIO $ getProcessStatus False True pid
> +          case status of
> +            Left e -> logFn $ "Job process already gone: " ++ show e
> +            Right (Just s) -> logFn $ "Child process status: " ++ show s
> +            Right Nothing -> do
> +              logFn $ "Child process running, killing by " ++ show sig
> +              liftIO $ signalProcess sig pid
> +              unless (null sigs) $ do
> +                threadDelay 100000 -- wait for 0.1s and check again
> +                killIfAlive sigs
> +
> +-- | Forks current process and running runFn in the child and commFn in
> the
>

Noticed this just now: s/running/runs/


> +-- parent. Due to a bug in GHC forking process, we want to retry if the
> forked
> +-- process fails to start. If it fails later on, the failure is handled by
> +-- 'ResultT' and no retry is performed.
> +forkProcessCatchErrors :: (Show e, FromString e)
> +                       => (Client -> IO ())
> +                       -> (ProcessID -> String -> ResultT e (WriterLogT
> IO) ())
> +                       -> (ProcessID -> Client
> +                           -> ResultT e (WriterLogT IO) (FilePath,
> ProcessID))
> +                       -> ResultT e IO (FilePath, ProcessID)
> +forkProcessCatchErrors runFn logFn commFn = do
> +  -- Due to a bug in GHC forking process, we want to retry
> +  -- if the forked process fails to start.
> +  -- If it fails later on, the failure is handled by 'ResultT'
> +  -- and no retry is performed.
> +  let execWriterLogInside = ResultT . execWriterLogT . runResultT
> +  retryErrorN C.luxidRetryForkCount
> +               $ \tryNo -> execWriterLogInside $ do
> +    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
> +    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
> +
> +    (pid, master) <- liftIO $ forkWithPipe connectConfig runFn
> +
> +    logFn pid "Forked a new process"
> +    flip catchError (\e -> killProcessOnError pid master (logFn pid)
> +                           >> throwError e) $ commFn pid master
> +
>  -- | Forks the job process and starts processing of the given job.
>  -- Returns the livelock of the job and its process ID.
>  forkJobProcess :: (FromString e, Show e)
> @@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e)
>                    -- and process id in the job file
>                 -> ResultT e IO (FilePath, ProcessID)
>  forkJobProcess job luxiLivelock update = do
> -  let jidStr = show . fromJobId . qjId $ job
> -
> -  -- Retrieve secret parameters if present
> -  let secretParams = encodeStrict . filterSecretParameters . qjOps $ job
>
>    logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
>               ++ " for job " ++ jidStr
>    update luxiLivelock
>
> -  -- Due to a bug in GHC forking process, we want to retry,
> -  -- if the forked process fails to start.
> -  -- If it fails later on, the failure is handled by 'ResultT'
> -  -- and no retry is performed.
> -  let execWriterLogInside = ResultT . execWriterLogT . runResultT
> -  retryErrorN C.luxidRetryForkCount
> -               $ \tryNo -> execWriterLogInside $ do
> -    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
> -    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
> -
> -    (pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess
> -                                                          . qjId $ job)
> -
> -    let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++
> "] "
> -        logDebugJob = logDebug . (jobLogPrefix ++)
> -
> -    logDebugJob "Forked a new process"
> -
> -    let killIfAlive [] = return ()
> -        killIfAlive (sig : sigs) = do
> -          logDebugJob "Getting the status of the process"
> -          status <- tryError . liftIO $ getProcessStatus False True pid
> -          case status of
> -            Left e -> logDebugJob $ "Job process already gone: " ++ show e
> -            Right (Just s) -> logDebugJob $ "Child process status: " ++
> show s
> -            Right Nothing -> do
> -                logDebugJob $ "Child process running, killing by " ++
> show sig
> -                liftIO $ signalProcess sig pid
> -                unless (null sigs) $ do
> -                  threadDelay 100000 -- wait for 0.1s and check again
> -                  killIfAlive sigs
> -
> -    let onError = do
> -          logDebugJob "Closing the pipe to the client"
> -          withErrorLogAt WARNING "Closing the communication pipe failed"
> -              (liftIO (closeClient master)) `orElse` return ()
> -          killIfAlive [sigTERM, sigABRT, sigKILL]
> -
> -    flip catchError (\e -> onError >> throwError e)
> -      $ do
> +  forkProcessCatchErrors (childMain . qjId $ job) logDebugJob
> +                         parentMain
> +  where
> +    -- Retrieve secret parameters if present
> +    secretParams = encodeStrict . filterSecretParameters . qjOps $ job
> +    jidStr = show . fromJobId . qjId $ job
> +    jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++
> "] "
> +    logDebugJob pid = logDebug . (jobLogPrefix pid ++)
> +
> +    -- | Code performing communication with the child process. First,
> receive
> +    -- the livelock, then send necessary parameters to the python child.
> +    parentMain pid master = do
>        let annotatedIO msg k = do
> -            logDebugJob msg
> -            liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
> +            logDebugJob pid msg
> +            liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k
>        let recv msg = annotatedIO msg (recvMsg master)
>            send msg x = annotatedIO msg (sendMsg master x)
>
>        lockfile <- recv "Getting the lockfile of the client"
>
> -      logDebugJob $ "Setting the lockfile to the final " ++ lockfile
> +      logDebugJob pid ("Setting the lockfile to the final " ++ lockfile)
>        toErrorBase $ update lockfile
>        send "Confirming the client it can start" ""
>
>        -- from now on, we communicate with the job's Python process
> -
>

To make myself more explicit since the previous review: is there a good
reason to get rid of this whitespace?

It partitions the communication into logical components and makes it more
readable :)


>        _ <- recv "Waiting for the job to ask for the job id"
>        send "Writing job id to the client" jidStr
> -
>        _ <- recv "Waiting for the job to ask for the lock file name"
>        send "Writing the lock file name to the client" lockfile
> -
>        _ <- recv "Waiting for the job to ask for secret parameters"
>        send "Writing secret parameters to the client" secretParams
> -
>        liftIO $ closeClient master
> -
>        return (lockfile, pid)
> +
> +    -- | Code performing communication with the parent process. During
> +    -- communication the livelock is created, locked and sent back
> +    -- to the parent.
> +    childMain jid s = runProcess jid s P.jqueueExecutorPy commFn
> +      where
> +        commFn logFn jid' s' = do
> +          -- Create a livelock file for the job
> +          (TOD ts _) <- getClockTime
> +          lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId
> jid') ts
> +          -- Lock the livelock file
> +          _ <- logFn $ "Locking livelock file " ++ show lockfile
> +          fd <- lockFile lockfile >>= annotateResult "Can't lock the
> livelock"
> +          _ <- logFn "Sending the lockfile name to the master process"
> +          sendMsg s' lockfile
> +          _ <- logFn "Waiting for the master process to confirm the lock"
> +          _ <- recvMsg s'
> +          return fd
> --
> 2.6.0.rc2.230.g3dd15c0
>
>
Hrvoje Ribicic
Ganeti Engineering
Google Germany GmbH
Dienerstr. 12, 80331, München

Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle
Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg

Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind,
leiten Sie diese bitte nicht weiter, informieren Sie den Absender und
löschen Sie die E-Mail und alle Anhänge. Vielen Dank.

This e-mail is confidential. If you are not the right addressee please do
not forward it, please inform the sender, and please erase this e-mail
including any attachments. Thanks.

Reply via email to