What do you thing about this interdiff? (commit message is also fixed)

--- a/src/Ganeti/Query/Exec.hs
+++ b/src/Ganeti/Query/Exec.hs
@@ -136,7 +136,7 @@ runProcess :: JobId -- ^ a job to process
            -> IO FilePath -- ^ path to the python executable
            -> ((String -> IO ()) -> JobId -> Client -> IO Fd)
-- ^ pre-execution function communicating with the parent. The
-              -- function returns the file descriptor which should be
+              -- function returns the file descriptor which should
               -- remain open
            -> IO ()
 runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $
@@ -244,7 +244,7 @@ killProcessOnError pid master logFn = do
                 threadDelay 100000 -- wait for 0.1s and check again
                 killIfAlive sigs

--- | Forks current process and running runFn in the child and commFn in the
+-- | Forks current process and runs runFn in the child and commFn in the
-- parent. Due to a bug in GHC forking process, we want to retry if the forked
 -- process fails to start. If it fails later on, the failure is handled by
 -- 'ResultT' and no retry is performed.
@@ -311,6 +311,7 @@ forkJobProcess job luxiLivelock update = do
       send "Confirming the client it can start" ""

       -- from now on, we communicate with the job's Python process
+
       _ <- recv "Waiting for the job to ask for the job id"
       send "Writing job id to the client" jidStr
       _ <- recv "Waiting for the job to ask for the lock file name"

On 11/24/2015 08:24 PM, Hrvoje Ribicic wrote:


On Fri, Nov 20, 2015 at 5:11 PM, 'Oleg Ponomarev' via ganeti-devel <[email protected] <mailto:[email protected]>> wrote:

    forkJobProcess implementation consist of several steps. Move each
    logical consistent step into the generalized function in order to
    reuse


Fix up nits pointed out in previous review.

    these code fragments in forkPostHooksProcess (will be the next
    commit).

    Signed-off-by: Oleg Ponomarev <[email protected]
    <mailto:[email protected]>>
    ---
     src/Ganeti/Query/Exec.hs | 184
    +++++++++++++++++++++++++++--------------------
     1 file changed, 106 insertions(+), 78 deletions(-)

    diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs
    index 79889ff..8a4b13f 100644
    --- a/src/Ganeti/Query/Exec.hs
    +++ b/src/Ganeti/Query/Exec.hs
    @@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable
         filterReadable :: (Read a) => [String] -> [a]
         filterReadable = mapMaybe (fmap fst . listToMaybe . reads)

    -
     -- | Catches a potential `IOError` and sets its description via
     -- `annotateIOError`. This makes exceptions more informative when
    they
     -- are thrown from an unnamed `Handle`.
    @@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a
     rethrowAnnotateIOError desc =
       modifyIOError (\e -> annotateIOError e desc Nothing Nothing)

    --- Code that is executed in a @fork@-ed process and that the
    replaces iteself
    --- with the actual job process
    -runJobProcess :: JobId -> Client -> IO ()
    -runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
    +
    +-- | Code that is executed in a @fork@-ed process. Performs
    communication with
    +-- the parent process by calling commFn and then runs pyExecIO python
    +-- executable.
    +runProcess :: JobId -- ^ a job to process
    +           -> Client -- ^ UDS transport
    +           -> IO FilePath -- ^ path to the python executable
    +           -> ((String -> IO ()) -> JobId -> Client -> IO Fd)
    +              -- ^ pre-execution function communicating with the
    parent. The
    +              -- function returns the file descriptor which should be


s/be//

    +              -- remain open
    +           -> IO ()
    +runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show
    jid) $
       do
         -- Close the standard error to prevent anything being written
    there
         -- (for example by exceptions when closing unneeded FDs).
    @@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL
    (show jid) $
         let logLater _ = return ()

         logLater $ "Forking a new process for job " ++ show
    (fromJobId jid)
    -
    -    -- Create a livelock file for the job
    -    (TOD ts _) <- getClockTime
    -    lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId
    jid) ts
    -
    -    -- Lock the livelock file
    -    logLater $ "Locking livelock file " ++ show lockfile
    -    fd <- lockFile lockfile >>= annotateResult "Can't lock the
    livelock file"
    -    logLater "Sending the lockfile name to the master process"
    -    sendMsg s lockfile
    -
    -    logLater "Waiting for the master process to confirm the lock"
    -    _ <- recvMsg s
    -
    +    preserve_fd <- commFn logLater jid s
         -- close the client
         logLater "Closing the client"
         (clFdR, clFdW) <- clientToFd s
    @@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt
    CRITICAL (show jid) $
         closeFd clFdR
         closeFd clFdW

    -    fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase
    listOpenFds
    +    fds <- (filter (> 2) . filter (/= preserve_fd)) <$>
    toErrorBase listOpenFds
         logLater $ "Closing every superfluous file descriptor: " ++
    show fds
         mapM_ (tryIOError . closeFd) fds

    -    -- the master process will send the job id and the livelock
    file name
    -    -- using the same protocol to the job process
    -    -- we pass the job id as the first argument to the process;
    -    -- while the process never uses it, it's very convenient when
    listing
    -    -- job processes
    +    -- The master process will send the job id and the livelock
    file name
    +    -- using the same protocol. We pass the job id as the first
    argument
    +    -- to the process. While the process never uses it, it's very
    convenient
    +    -- when listing job processes.
         use_debug <- isDebugMode
         env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
                 . M.insert "PYTHONPATH" AC.versionedsharedir
                 . M.fromList)
                `liftM` getEnvironment
    -    execPy <- P.jqueueExecutorPy
    +    execPy <- pyExecIO
         logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
                    ++ " with PYTHONPATH=" ++ AC.versionedsharedir
         () <- executeFile AC.pythonPath True [execPy, show (fromJobId
    jid)]
    @@ -224,6 +218,59 @@ forkWithPipe conf childAction = do
                $ closeClient child
       return (pid, master)

    +-- | Kill the process with the id provided.
    +killProcessOnError :: (FromString e, Show e)
    +                   => ProcessID -- ^ job process pid
    +                   -> Client -- ^ UDS client connected to the
    master node
    +                   -> (String -> ResultT e (WriterLogT IO) ())
    +                      -- ^ log function
    +                   -> ResultT e (WriterLogT IO) ()
    +killProcessOnError pid master logFn = do
    +  logFn "Closing the pipe to the client"
    +  withErrorLogAt WARNING "Closing the communication pipe failed"
    +                 (liftIO (closeClient master)) `orElse` return ()
    +  killIfAlive [sigTERM, sigABRT, sigKILL]
    +  where killIfAlive [] = return ()
    +        killIfAlive (sig : sigs) = do
    +          logFn "Getting the status of the process"
    +          status <- tryError . liftIO $ getProcessStatus False
    True pid
    +          case status of
    +            Left e -> logFn $ "Job process already gone: " ++ show e
    +            Right (Just s) -> logFn $ "Child process status: " ++
    show s
    +            Right Nothing -> do
    +              logFn $ "Child process running, killing by " ++
    show sig
    +              liftIO $ signalProcess sig pid
    +              unless (null sigs) $ do
    +                threadDelay 100000 -- wait for 0.1s and check again
    +                killIfAlive sigs
    +
    +-- | Forks current process and running runFn in the child and
    commFn in the


Noticed this just now: s/running/runs/

    +-- parent. Due to a bug in GHC forking process, we want to retry
    if the forked
    +-- process fails to start. If it fails later on, the failure is
    handled by
    +-- 'ResultT' and no retry is performed.
    +forkProcessCatchErrors :: (Show e, FromString e)
    +                       => (Client -> IO ())
    +                       -> (ProcessID -> String -> ResultT e
    (WriterLogT IO) ())
    +                       -> (ProcessID -> Client
    +                           -> ResultT e (WriterLogT IO)
    (FilePath, ProcessID))
    +                       -> ResultT e IO (FilePath, ProcessID)
    +forkProcessCatchErrors runFn logFn commFn = do
    +  -- Due to a bug in GHC forking process, we want to retry
    +  -- if the forked process fails to start.
    +  -- If it fails later on, the failure is handled by 'ResultT'
    +  -- and no retry is performed.
    +  let execWriterLogInside = ResultT . execWriterLogT . runResultT
    +  retryErrorN C.luxidRetryForkCount
    +               $ \tryNo -> execWriterLogInside $ do
    +    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
    +    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
    +
    +    (pid, master) <- liftIO $ forkWithPipe connectConfig runFn
    +
    +    logFn pid "Forked a new process"
    +    flip catchError (\e -> killProcessOnError pid master (logFn pid)
    +                           >> throwError e) $ commFn pid master
    +
     -- | Forks the job process and starts processing of the given job.
     -- Returns the livelock of the job and its process ID.
     forkJobProcess :: (FromString e, Show e)
    @@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e)
                       -- and process id in the job file
                    -> ResultT e IO (FilePath, ProcessID)
     forkJobProcess job luxiLivelock update = do
    -  let jidStr = show . fromJobId . qjId $ job
    -
    -  -- Retrieve secret parameters if present
    -  let secretParams = encodeStrict . filterSecretParameters .
    qjOps $ job

       logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
                  ++ " for job " ++ jidStr
       update luxiLivelock

    -  -- Due to a bug in GHC forking process, we want to retry,
    -  -- if the forked process fails to start.
    -  -- If it fails later on, the failure is handled by 'ResultT'
    -  -- and no retry is performed.
    -  let execWriterLogInside = ResultT . execWriterLogT . runResultT
    -  retryErrorN C.luxidRetryForkCount
    -               $ \tryNo -> execWriterLogInside $ do
    -    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
    -    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
    -
    -    (pid, master) <- liftIO $ forkWithPipe connectConfig
    (runJobProcess
    - . qjId $ job)
    -
    -    let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show
    pid ++ "] "
    -        logDebugJob = logDebug . (jobLogPrefix ++)
    -
    -    logDebugJob "Forked a new process"
    -
    -    let killIfAlive [] = return ()
    -        killIfAlive (sig : sigs) = do
    -          logDebugJob "Getting the status of the process"
    -          status <- tryError . liftIO $ getProcessStatus False
    True pid
    -          case status of
    -            Left e -> logDebugJob $ "Job process already gone: "
    ++ show e
    -            Right (Just s) -> logDebugJob $ "Child process
    status: " ++ show s
    -            Right Nothing -> do
    -                logDebugJob $ "Child process running, killing by
    " ++ show sig
    -                liftIO $ signalProcess sig pid
    -                unless (null sigs) $ do
    -                  threadDelay 100000 -- wait for 0.1s and check again
    -                  killIfAlive sigs
    -
    -    let onError = do
    -          logDebugJob "Closing the pipe to the client"
    -          withErrorLogAt WARNING "Closing the communication pipe
    failed"
    -              (liftIO (closeClient master)) `orElse` return ()
    -          killIfAlive [sigTERM, sigABRT, sigKILL]
    -
    -    flip catchError (\e -> onError >> throwError e)
    -      $ do
    +  forkProcessCatchErrors (childMain . qjId $ job) logDebugJob
    +                         parentMain
    +  where
    +    -- Retrieve secret parameters if present
    +    secretParams = encodeStrict . filterSecretParameters . qjOps
    $ job
    +    jidStr = show . fromJobId . qjId $ job
    +    jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show
    pid ++ "] "
    +    logDebugJob pid = logDebug . (jobLogPrefix pid ++)
    +
    +    -- | Code performing communication with the child process.
    First, receive
    +    -- the livelock, then send necessary parameters to the python
    child.
    +    parentMain pid master = do
           let annotatedIO msg k = do
    -            logDebugJob msg
    -            liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
    +            logDebugJob pid msg
    +            liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++
    msg) k
           let recv msg = annotatedIO msg (recvMsg master)
               send msg x = annotatedIO msg (sendMsg master x)

           lockfile <- recv "Getting the lockfile of the client"

    -      logDebugJob $ "Setting the lockfile to the final " ++ lockfile
    +      logDebugJob pid ("Setting the lockfile to the final " ++
    lockfile)
           toErrorBase $ update lockfile
           send "Confirming the client it can start" ""

           -- from now on, we communicate with the job's Python process
    -


To make myself more explicit since the previous review: is there a good reason to get rid of this whitespace?

It partitions the communication into logical components and makes it more readable :)

           _ <- recv "Waiting for the job to ask for the job id"
           send "Writing job id to the client" jidStr
    -
           _ <- recv "Waiting for the job to ask for the lock file name"
           send "Writing the lock file name to the client" lockfile
    -
           _ <- recv "Waiting for the job to ask for secret parameters"
           send "Writing secret parameters to the client" secretParams
    -
           liftIO $ closeClient master
    -
           return (lockfile, pid)
    +
    +    -- | Code performing communication with the parent process.
    During
    +    -- communication the livelock is created, locked and sent back
    +    -- to the parent.
    +    childMain jid s = runProcess jid s P.jqueueExecutorPy commFn
    +      where
    +        commFn logFn jid' s' = do
    +          -- Create a livelock file for the job
    +          (TOD ts _) <- getClockTime
    +          lockfile <- P.livelockFile $ printf "job_%06d_%d"
    (fromJobId jid') ts
    +          -- Lock the livelock file
    +          _ <- logFn $ "Locking livelock file " ++ show lockfile
    +          fd <- lockFile lockfile >>= annotateResult "Can't lock
    the livelock"
    +          _ <- logFn "Sending the lockfile name to the master
    process"
    +          sendMsg s' lockfile
    +          _ <- logFn "Waiting for the master process to confirm
    the lock"
    +          _ <- recvMsg s'
    +          return fd
    --
    2.6.0.rc2.230.g3dd15c0


Hrvoje Ribicic
Ganeti Engineering
Google Germany GmbH
Dienerstr. 12, 80331, München

Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle
Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg

Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind, leiten Sie diese bitte nicht weiter, informieren Sie den Absender und löschen Sie die E-Mail und alle Anhänge. Vielen Dank. This e-mail is confidential. If you are not the right addressee please do not forward it, please inform the sender, and please erase this e-mail including any attachments. Thanks.


Reply via email to