LGTM, thanks

On Wed, Nov 25, 2015 at 11:47 AM, Oleg Ponomarev <[email protected]>
wrote:

> What do you thing about this interdiff? (commit message is also fixed)
>
> --- a/src/Ganeti/Query/Exec.hs
> +++ b/src/Ganeti/Query/Exec.hs
> @@ -136,7 +136,7 @@ runProcess :: JobId -- ^ a job to process
>             -> IO FilePath -- ^ path to the python executable
>             -> ((String -> IO ()) -> JobId -> Client -> IO Fd)
>                -- ^ pre-execution function communicating with the parent.
> The
> -              -- function returns the file descriptor which should be
> +              -- function returns the file descriptor which should
>                -- remain open
>             -> IO ()
>  runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $
> @@ -244,7 +244,7 @@ killProcessOnError pid master logFn = do
>                  threadDelay 100000 -- wait for 0.1s and check again
>                  killIfAlive sigs
>
> --- | Forks current process and running runFn in the child and commFn in
> the
> +-- | Forks current process and runs runFn in the child and commFn in the
>  -- parent. Due to a bug in GHC forking process, we want to retry if the
> forked
>  -- process fails to start. If it fails later on, the failure is handled by
>  -- 'ResultT' and no retry is performed.
> @@ -311,6 +311,7 @@ forkJobProcess job luxiLivelock update = do
>        send "Confirming the client it can start" ""
>
>        -- from now on, we communicate with the job's Python process
> +
>        _ <- recv "Waiting for the job to ask for the job id"
>        send "Writing job id to the client" jidStr
>        _ <- recv "Waiting for the job to ask for the lock file name"
>
> On 11/24/2015 08:24 PM, Hrvoje Ribicic wrote:
>
>
>
> On Fri, Nov 20, 2015 at 5:11 PM, 'Oleg Ponomarev' via ganeti-devel <
> <[email protected]>[email protected]> wrote:
>
>> forkJobProcess implementation consist of several steps. Move each
>> logical consistent step into the generalized function in order to reuse
>>
>
> Fix up nits pointed out in previous review.
>
>
>> these code fragments in forkPostHooksProcess (will be the next commit).
>>
>> Signed-off-by: Oleg Ponomarev < <[email protected]>
>> [email protected]>
>> ---
>>  src/Ganeti/Query/Exec.hs | 184
>> +++++++++++++++++++++++++++--------------------
>>  1 file changed, 106 insertions(+), 78 deletions(-)
>>
>> diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs
>> index 79889ff..8a4b13f 100644
>> --- a/src/Ganeti/Query/Exec.hs
>> +++ b/src/Ganeti/Query/Exec.hs
>> @@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable
>>      filterReadable :: (Read a) => [String] -> [a]
>>      filterReadable = mapMaybe (fmap fst . listToMaybe . reads)
>>
>> -
>>  -- | Catches a potential `IOError` and sets its description via
>>  -- `annotateIOError`. This makes exceptions more informative when they
>>  -- are thrown from an unnamed `Handle`.
>> @@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a
>>  rethrowAnnotateIOError desc =
>>    modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
>>
>> --- Code that is executed in a @fork@-ed process and that the replaces
>> iteself
>> --- with the actual job process
>> -runJobProcess :: JobId -> Client -> IO ()
>> -runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
>> +
>> +-- | Code that is executed in a @fork@-ed process. Performs
>> communication with
>> +-- the parent process by calling commFn and then runs pyExecIO python
>> +-- executable.
>> +runProcess :: JobId -- ^ a job to process
>> +           -> Client -- ^ UDS transport
>> +           -> IO FilePath -- ^ path to the python executable
>> +           -> ((String -> IO ()) -> JobId -> Client -> IO Fd)
>> +              -- ^ pre-execution function communicating with the parent.
>> The
>> +              -- function returns the file descriptor which should be
>>
>
> s/be//
>
>
>> +              -- remain open
>> +           -> IO ()
>> +runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $
>>    do
>>      -- Close the standard error to prevent anything being written there
>>      -- (for example by exceptions when closing unneeded FDs).
>> @@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show
>> jid) $
>>      let logLater _ = return ()
>>
>>      logLater $ "Forking a new process for job " ++ show (fromJobId jid)
>> -
>> -    -- Create a livelock file for the job
>> -    (TOD ts _) <- getClockTime
>> -    lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts
>> -
>> -    -- Lock the livelock file
>> -    logLater $ "Locking livelock file " ++ show lockfile
>> -    fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock
>> file"
>> -    logLater "Sending the lockfile name to the master process"
>> -    sendMsg s lockfile
>> -
>> -    logLater "Waiting for the master process to confirm the lock"
>> -    _ <- recvMsg s
>> -
>> +    preserve_fd <- commFn logLater jid s
>>      -- close the client
>>      logLater "Closing the client"
>>      (clFdR, clFdW) <- clientToFd s
>> @@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show
>> jid) $
>>      closeFd clFdR
>>      closeFd clFdW
>>
>> -    fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds
>> +    fds <- (filter (> 2) . filter (/= preserve_fd)) <$> toErrorBase
>> listOpenFds
>>      logLater $ "Closing every superfluous file descriptor: " ++ show fds
>>      mapM_ (tryIOError . closeFd) fds
>>
>> -    -- the master process will send the job id and the livelock file name
>> -    -- using the same protocol to the job process
>> -    -- we pass the job id as the first argument to the process;
>> -    -- while the process never uses it, it's very convenient when listing
>> -    -- job processes
>> +    -- The master process will send the job id and the livelock file name
>> +    -- using the same protocol. We pass the job id as the first argument
>> +    -- to the process. While the process never uses it, it's very
>> convenient
>> +    -- when listing job processes.
>>      use_debug <- isDebugMode
>>      env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
>>              . M.insert "PYTHONPATH" AC.versionedsharedir
>>              . M.fromList)
>>             `liftM` getEnvironment
>> -    execPy <- P.jqueueExecutorPy
>> +    execPy <- pyExecIO
>>      logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
>>                 ++ " with PYTHONPATH=" ++ AC.versionedsharedir
>>      () <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)]
>> @@ -224,6 +218,59 @@ forkWithPipe conf childAction = do
>>             $ closeClient child
>>    return (pid, master)
>>
>> +-- | Kill the process with the id provided.
>> +killProcessOnError :: (FromString e, Show e)
>> +                   => ProcessID -- ^ job process pid
>> +                   -> Client -- ^ UDS client connected to the master node
>> +                   -> (String -> ResultT e (WriterLogT IO) ())
>> +                      -- ^ log function
>> +                   -> ResultT e (WriterLogT IO) ()
>> +killProcessOnError pid master logFn = do
>> +  logFn "Closing the pipe to the client"
>> +  withErrorLogAt WARNING "Closing the communication pipe failed"
>> +                 (liftIO (closeClient master)) `orElse` return ()
>> +  killIfAlive [sigTERM, sigABRT, sigKILL]
>> +  where killIfAlive [] = return ()
>> +        killIfAlive (sig : sigs) = do
>> +          logFn "Getting the status of the process"
>> +          status <- tryError . liftIO $ getProcessStatus False True pid
>> +          case status of
>> +            Left e -> logFn $ "Job process already gone: " ++ show e
>> +            Right (Just s) -> logFn $ "Child process status: " ++ show s
>> +            Right Nothing -> do
>> +              logFn $ "Child process running, killing by " ++ show sig
>> +              liftIO $ signalProcess sig pid
>> +              unless (null sigs) $ do
>> +                threadDelay 100000 -- wait for 0.1s and check again
>> +                killIfAlive sigs
>> +
>> +-- | Forks current process and running runFn in the child and commFn in
>> the
>>
>
> Noticed this just now: s/running/runs/
>
>
>> +-- parent. Due to a bug in GHC forking process, we want to retry if the
>> forked
>> +-- process fails to start. If it fails later on, the failure is handled
>> by
>> +-- 'ResultT' and no retry is performed.
>> +forkProcessCatchErrors :: (Show e, FromString e)
>> +                       => (Client -> IO ())
>> +                       -> (ProcessID -> String -> ResultT e (WriterLogT
>> IO) ())
>> +                       -> (ProcessID -> Client
>> +                           -> ResultT e (WriterLogT IO) (FilePath,
>> ProcessID))
>> +                       -> ResultT e IO (FilePath, ProcessID)
>> +forkProcessCatchErrors runFn logFn commFn = do
>> +  -- Due to a bug in GHC forking process, we want to retry
>> +  -- if the forked process fails to start.
>> +  -- If it fails later on, the failure is handled by 'ResultT'
>> +  -- and no retry is performed.
>> +  let execWriterLogInside = ResultT . execWriterLogT . runResultT
>> +  retryErrorN C.luxidRetryForkCount
>> +               $ \tryNo -> execWriterLogInside $ do
>> +    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
>> +    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
>> +
>> +    (pid, master) <- liftIO $ forkWithPipe connectConfig runFn
>> +
>> +    logFn pid "Forked a new process"
>> +    flip catchError (\e -> killProcessOnError pid master (logFn pid)
>> +                           >> throwError e) $ commFn pid master
>> +
>>  -- | Forks the job process and starts processing of the given job.
>>  -- Returns the livelock of the job and its process ID.
>>  forkJobProcess :: (FromString e, Show e)
>> @@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e)
>>                    -- and process id in the job file
>>                 -> ResultT e IO (FilePath, ProcessID)
>>  forkJobProcess job luxiLivelock update = do
>> -  let jidStr = show . fromJobId . qjId $ job
>> -
>> -  -- Retrieve secret parameters if present
>> -  let secretParams = encodeStrict . filterSecretParameters . qjOps $ job
>>
>>    logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
>>               ++ " for job " ++ jidStr
>>    update luxiLivelock
>>
>> -  -- Due to a bug in GHC forking process, we want to retry,
>> -  -- if the forked process fails to start.
>> -  -- If it fails later on, the failure is handled by 'ResultT'
>> -  -- and no retry is performed.
>> -  let execWriterLogInside = ResultT . execWriterLogT . runResultT
>> -  retryErrorN C.luxidRetryForkCount
>> -               $ \tryNo -> execWriterLogInside $ do
>> -    let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
>> -    when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
>> -
>> -    (pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess
>> -                                                          . qjId $ job)
>> -
>> -    let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++
>> "] "
>> -        logDebugJob = logDebug . (jobLogPrefix ++)
>> -
>> -    logDebugJob "Forked a new process"
>> -
>> -    let killIfAlive [] = return ()
>> -        killIfAlive (sig : sigs) = do
>> -          logDebugJob "Getting the status of the process"
>> -          status <- tryError . liftIO $ getProcessStatus False True pid
>> -          case status of
>> -            Left e -> logDebugJob $ "Job process already gone: " ++ show
>> e
>> -            Right (Just s) -> logDebugJob $ "Child process status: " ++
>> show s
>> -            Right Nothing -> do
>> -                logDebugJob $ "Child process running, killing by " ++
>> show sig
>> -                liftIO $ signalProcess sig pid
>> -                unless (null sigs) $ do
>> -                  threadDelay 100000 -- wait for 0.1s and check again
>> -                  killIfAlive sigs
>> -
>> -    let onError = do
>> -          logDebugJob "Closing the pipe to the client"
>> -          withErrorLogAt WARNING "Closing the communication pipe failed"
>> -              (liftIO (closeClient master)) `orElse` return ()
>> -          killIfAlive [sigTERM, sigABRT, sigKILL]
>> -
>> -    flip catchError (\e -> onError >> throwError e)
>> -      $ do
>> +  forkProcessCatchErrors (childMain . qjId $ job) logDebugJob
>> +                         parentMain
>> +  where
>> +    -- Retrieve secret parameters if present
>> +    secretParams = encodeStrict . filterSecretParameters . qjOps $ job
>> +    jidStr = show . fromJobId . qjId $ job
>> +    jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++
>> "] "
>> +    logDebugJob pid = logDebug . (jobLogPrefix pid ++)
>> +
>> +    -- | Code performing communication with the child process. First,
>> receive
>> +    -- the livelock, then send necessary parameters to the python child.
>> +    parentMain pid master = do
>>        let annotatedIO msg k = do
>> -            logDebugJob msg
>> -            liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
>> +            logDebugJob pid msg
>> +            liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k
>>        let recv msg = annotatedIO msg (recvMsg master)
>>            send msg x = annotatedIO msg (sendMsg master x)
>>
>>        lockfile <- recv "Getting the lockfile of the client"
>>
>> -      logDebugJob $ "Setting the lockfile to the final " ++ lockfile
>> +      logDebugJob pid ("Setting the lockfile to the final " ++ lockfile)
>>        toErrorBase $ update lockfile
>>        send "Confirming the client it can start" ""
>>
>>        -- from now on, we communicate with the job's Python process
>> -
>>
>
> To make myself more explicit since the previous review: is there a good
> reason to get rid of this whitespace?
>
> It partitions the communication into logical components and makes it more
> readable :)
>
>
>>        _ <- recv "Waiting for the job to ask for the job id"
>>        send "Writing job id to the client" jidStr
>> -
>>        _ <- recv "Waiting for the job to ask for the lock file name"
>>        send "Writing the lock file name to the client" lockfile
>> -
>>        _ <- recv "Waiting for the job to ask for secret parameters"
>>        send "Writing secret parameters to the client" secretParams
>> -
>>        liftIO $ closeClient master
>> -
>>        return (lockfile, pid)
>> +
>> +    -- | Code performing communication with the parent process. During
>> +    -- communication the livelock is created, locked and sent back
>> +    -- to the parent.
>> +    childMain jid s = runProcess jid s P.jqueueExecutorPy commFn
>> +      where
>> +        commFn logFn jid' s' = do
>> +          -- Create a livelock file for the job
>> +          (TOD ts _) <- getClockTime
>> +          lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId
>> jid') ts
>> +          -- Lock the livelock file
>> +          _ <- logFn $ "Locking livelock file " ++ show lockfile
>> +          fd <- lockFile lockfile >>= annotateResult "Can't lock the
>> livelock"
>> +          _ <- logFn "Sending the lockfile name to the master process"
>> +          sendMsg s' lockfile
>> +          _ <- logFn "Waiting for the master process to confirm the lock"
>> +          _ <- recvMsg s'
>> +          return fd
>> --
>> 2.6.0.rc2.230.g3dd15c0
>>
>>
> Hrvoje Ribicic
> Ganeti Engineering
> Google Germany GmbH
> Dienerstr. 12, 80331, München
>
> Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle
> Registergericht und -nummer: Hamburg, HRB 86891
> Sitz der Gesellschaft: Hamburg
>
> Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind,
> leiten Sie diese bitte nicht weiter, informieren Sie den Absender und
> löschen Sie die E-Mail und alle Anhänge. Vielen Dank.
>
> This e-mail is confidential. If you are not the right addressee please do
> not forward it, please inform the sender, and please erase this e-mail
> including any attachments. Thanks.
>
>
>
Hrvoje Ribicic
Ganeti Engineering
Google Germany GmbH
Dienerstr. 12, 80331, München

Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle
Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg

Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind,
leiten Sie diese bitte nicht weiter, informieren Sie den Absender und
löschen Sie die E-Mail und alle Anhänge. Vielen Dank.

This e-mail is confidential. If you are not the right addressee please do
not forward it, please inform the sender, and please erase this e-mail
including any attachments. Thanks.

Reply via email to