LGTM, thanks On Wed, Nov 25, 2015 at 11:47 AM, Oleg Ponomarev <[email protected]> wrote:
> What do you thing about this interdiff? (commit message is also fixed) > > --- a/src/Ganeti/Query/Exec.hs > +++ b/src/Ganeti/Query/Exec.hs > @@ -136,7 +136,7 @@ runProcess :: JobId -- ^ a job to process > -> IO FilePath -- ^ path to the python executable > -> ((String -> IO ()) -> JobId -> Client -> IO Fd) > -- ^ pre-execution function communicating with the parent. > The > - -- function returns the file descriptor which should be > + -- function returns the file descriptor which should > -- remain open > -> IO () > runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $ > @@ -244,7 +244,7 @@ killProcessOnError pid master logFn = do > threadDelay 100000 -- wait for 0.1s and check again > killIfAlive sigs > > --- | Forks current process and running runFn in the child and commFn in > the > +-- | Forks current process and runs runFn in the child and commFn in the > -- parent. Due to a bug in GHC forking process, we want to retry if the > forked > -- process fails to start. If it fails later on, the failure is handled by > -- 'ResultT' and no retry is performed. > @@ -311,6 +311,7 @@ forkJobProcess job luxiLivelock update = do > send "Confirming the client it can start" "" > > -- from now on, we communicate with the job's Python process > + > _ <- recv "Waiting for the job to ask for the job id" > send "Writing job id to the client" jidStr > _ <- recv "Waiting for the job to ask for the lock file name" > > On 11/24/2015 08:24 PM, Hrvoje Ribicic wrote: > > > > On Fri, Nov 20, 2015 at 5:11 PM, 'Oleg Ponomarev' via ganeti-devel < > <[email protected]>[email protected]> wrote: > >> forkJobProcess implementation consist of several steps. Move each >> logical consistent step into the generalized function in order to reuse >> > > Fix up nits pointed out in previous review. > > >> these code fragments in forkPostHooksProcess (will be the next commit). >> >> Signed-off-by: Oleg Ponomarev < <[email protected]> >> [email protected]> >> --- >> src/Ganeti/Query/Exec.hs | 184 >> +++++++++++++++++++++++++++-------------------- >> 1 file changed, 106 insertions(+), 78 deletions(-) >> >> diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs >> index 79889ff..8a4b13f 100644 >> --- a/src/Ganeti/Query/Exec.hs >> +++ b/src/Ganeti/Query/Exec.hs >> @@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable >> filterReadable :: (Read a) => [String] -> [a] >> filterReadable = mapMaybe (fmap fst . listToMaybe . reads) >> >> - >> -- | Catches a potential `IOError` and sets its description via >> -- `annotateIOError`. This makes exceptions more informative when they >> -- are thrown from an unnamed `Handle`. >> @@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a >> rethrowAnnotateIOError desc = >> modifyIOError (\e -> annotateIOError e desc Nothing Nothing) >> >> --- Code that is executed in a @fork@-ed process and that the replaces >> iteself >> --- with the actual job process >> -runJobProcess :: JobId -> Client -> IO () >> -runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $ >> + >> +-- | Code that is executed in a @fork@-ed process. Performs >> communication with >> +-- the parent process by calling commFn and then runs pyExecIO python >> +-- executable. >> +runProcess :: JobId -- ^ a job to process >> + -> Client -- ^ UDS transport >> + -> IO FilePath -- ^ path to the python executable >> + -> ((String -> IO ()) -> JobId -> Client -> IO Fd) >> + -- ^ pre-execution function communicating with the parent. >> The >> + -- function returns the file descriptor which should be >> > > s/be// > > >> + -- remain open >> + -> IO () >> +runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $ >> do >> -- Close the standard error to prevent anything being written there >> -- (for example by exceptions when closing unneeded FDs). >> @@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show >> jid) $ >> let logLater _ = return () >> >> logLater $ "Forking a new process for job " ++ show (fromJobId jid) >> - >> - -- Create a livelock file for the job >> - (TOD ts _) <- getClockTime >> - lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts >> - >> - -- Lock the livelock file >> - logLater $ "Locking livelock file " ++ show lockfile >> - fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock >> file" >> - logLater "Sending the lockfile name to the master process" >> - sendMsg s lockfile >> - >> - logLater "Waiting for the master process to confirm the lock" >> - _ <- recvMsg s >> - >> + preserve_fd <- commFn logLater jid s >> -- close the client >> logLater "Closing the client" >> (clFdR, clFdW) <- clientToFd s >> @@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show >> jid) $ >> closeFd clFdR >> closeFd clFdW >> >> - fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds >> + fds <- (filter (> 2) . filter (/= preserve_fd)) <$> toErrorBase >> listOpenFds >> logLater $ "Closing every superfluous file descriptor: " ++ show fds >> mapM_ (tryIOError . closeFd) fds >> >> - -- the master process will send the job id and the livelock file name >> - -- using the same protocol to the job process >> - -- we pass the job id as the first argument to the process; >> - -- while the process never uses it, it's very convenient when listing >> - -- job processes >> + -- The master process will send the job id and the livelock file name >> + -- using the same protocol. We pass the job id as the first argument >> + -- to the process. While the process never uses it, it's very >> convenient >> + -- when listing job processes. >> use_debug <- isDebugMode >> env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0") >> . M.insert "PYTHONPATH" AC.versionedsharedir >> . M.fromList) >> `liftM` getEnvironment >> - execPy <- P.jqueueExecutorPy >> + execPy <- pyExecIO >> logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy >> ++ " with PYTHONPATH=" ++ AC.versionedsharedir >> () <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)] >> @@ -224,6 +218,59 @@ forkWithPipe conf childAction = do >> $ closeClient child >> return (pid, master) >> >> +-- | Kill the process with the id provided. >> +killProcessOnError :: (FromString e, Show e) >> + => ProcessID -- ^ job process pid >> + -> Client -- ^ UDS client connected to the master node >> + -> (String -> ResultT e (WriterLogT IO) ()) >> + -- ^ log function >> + -> ResultT e (WriterLogT IO) () >> +killProcessOnError pid master logFn = do >> + logFn "Closing the pipe to the client" >> + withErrorLogAt WARNING "Closing the communication pipe failed" >> + (liftIO (closeClient master)) `orElse` return () >> + killIfAlive [sigTERM, sigABRT, sigKILL] >> + where killIfAlive [] = return () >> + killIfAlive (sig : sigs) = do >> + logFn "Getting the status of the process" >> + status <- tryError . liftIO $ getProcessStatus False True pid >> + case status of >> + Left e -> logFn $ "Job process already gone: " ++ show e >> + Right (Just s) -> logFn $ "Child process status: " ++ show s >> + Right Nothing -> do >> + logFn $ "Child process running, killing by " ++ show sig >> + liftIO $ signalProcess sig pid >> + unless (null sigs) $ do >> + threadDelay 100000 -- wait for 0.1s and check again >> + killIfAlive sigs >> + >> +-- | Forks current process and running runFn in the child and commFn in >> the >> > > Noticed this just now: s/running/runs/ > > >> +-- parent. Due to a bug in GHC forking process, we want to retry if the >> forked >> +-- process fails to start. If it fails later on, the failure is handled >> by >> +-- 'ResultT' and no retry is performed. >> +forkProcessCatchErrors :: (Show e, FromString e) >> + => (Client -> IO ()) >> + -> (ProcessID -> String -> ResultT e (WriterLogT >> IO) ()) >> + -> (ProcessID -> Client >> + -> ResultT e (WriterLogT IO) (FilePath, >> ProcessID)) >> + -> ResultT e IO (FilePath, ProcessID) >> +forkProcessCatchErrors runFn logFn commFn = do >> + -- Due to a bug in GHC forking process, we want to retry >> + -- if the forked process fails to start. >> + -- If it fails later on, the failure is handled by 'ResultT' >> + -- and no retry is performed. >> + let execWriterLogInside = ResultT . execWriterLogT . runResultT >> + retryErrorN C.luxidRetryForkCount >> + $ \tryNo -> execWriterLogInside $ do >> + let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS >> + when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS) >> + >> + (pid, master) <- liftIO $ forkWithPipe connectConfig runFn >> + >> + logFn pid "Forked a new process" >> + flip catchError (\e -> killProcessOnError pid master (logFn pid) >> + >> throwError e) $ commFn pid master >> + >> -- | Forks the job process and starts processing of the given job. >> -- Returns the livelock of the job and its process ID. >> forkJobProcess :: (FromString e, Show e) >> @@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e) >> -- and process id in the job file >> -> ResultT e IO (FilePath, ProcessID) >> forkJobProcess job luxiLivelock update = do >> - let jidStr = show . fromJobId . qjId $ job >> - >> - -- Retrieve secret parameters if present >> - let secretParams = encodeStrict . filterSecretParameters . qjOps $ job >> >> logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock >> ++ " for job " ++ jidStr >> update luxiLivelock >> >> - -- Due to a bug in GHC forking process, we want to retry, >> - -- if the forked process fails to start. >> - -- If it fails later on, the failure is handled by 'ResultT' >> - -- and no retry is performed. >> - let execWriterLogInside = ResultT . execWriterLogT . runResultT >> - retryErrorN C.luxidRetryForkCount >> - $ \tryNo -> execWriterLogInside $ do >> - let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS >> - when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS) >> - >> - (pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess >> - . qjId $ job) >> - >> - let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ >> "] " >> - logDebugJob = logDebug . (jobLogPrefix ++) >> - >> - logDebugJob "Forked a new process" >> - >> - let killIfAlive [] = return () >> - killIfAlive (sig : sigs) = do >> - logDebugJob "Getting the status of the process" >> - status <- tryError . liftIO $ getProcessStatus False True pid >> - case status of >> - Left e -> logDebugJob $ "Job process already gone: " ++ show >> e >> - Right (Just s) -> logDebugJob $ "Child process status: " ++ >> show s >> - Right Nothing -> do >> - logDebugJob $ "Child process running, killing by " ++ >> show sig >> - liftIO $ signalProcess sig pid >> - unless (null sigs) $ do >> - threadDelay 100000 -- wait for 0.1s and check again >> - killIfAlive sigs >> - >> - let onError = do >> - logDebugJob "Closing the pipe to the client" >> - withErrorLogAt WARNING "Closing the communication pipe failed" >> - (liftIO (closeClient master)) `orElse` return () >> - killIfAlive [sigTERM, sigABRT, sigKILL] >> - >> - flip catchError (\e -> onError >> throwError e) >> - $ do >> + forkProcessCatchErrors (childMain . qjId $ job) logDebugJob >> + parentMain >> + where >> + -- Retrieve secret parameters if present >> + secretParams = encodeStrict . filterSecretParameters . qjOps $ job >> + jidStr = show . fromJobId . qjId $ job >> + jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ >> "] " >> + logDebugJob pid = logDebug . (jobLogPrefix pid ++) >> + >> + -- | Code performing communication with the child process. First, >> receive >> + -- the livelock, then send necessary parameters to the python child. >> + parentMain pid master = do >> let annotatedIO msg k = do >> - logDebugJob msg >> - liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k >> + logDebugJob pid msg >> + liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k >> let recv msg = annotatedIO msg (recvMsg master) >> send msg x = annotatedIO msg (sendMsg master x) >> >> lockfile <- recv "Getting the lockfile of the client" >> >> - logDebugJob $ "Setting the lockfile to the final " ++ lockfile >> + logDebugJob pid ("Setting the lockfile to the final " ++ lockfile) >> toErrorBase $ update lockfile >> send "Confirming the client it can start" "" >> >> -- from now on, we communicate with the job's Python process >> - >> > > To make myself more explicit since the previous review: is there a good > reason to get rid of this whitespace? > > It partitions the communication into logical components and makes it more > readable :) > > >> _ <- recv "Waiting for the job to ask for the job id" >> send "Writing job id to the client" jidStr >> - >> _ <- recv "Waiting for the job to ask for the lock file name" >> send "Writing the lock file name to the client" lockfile >> - >> _ <- recv "Waiting for the job to ask for secret parameters" >> send "Writing secret parameters to the client" secretParams >> - >> liftIO $ closeClient master >> - >> return (lockfile, pid) >> + >> + -- | Code performing communication with the parent process. During >> + -- communication the livelock is created, locked and sent back >> + -- to the parent. >> + childMain jid s = runProcess jid s P.jqueueExecutorPy commFn >> + where >> + commFn logFn jid' s' = do >> + -- Create a livelock file for the job >> + (TOD ts _) <- getClockTime >> + lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId >> jid') ts >> + -- Lock the livelock file >> + _ <- logFn $ "Locking livelock file " ++ show lockfile >> + fd <- lockFile lockfile >>= annotateResult "Can't lock the >> livelock" >> + _ <- logFn "Sending the lockfile name to the master process" >> + sendMsg s' lockfile >> + _ <- logFn "Waiting for the master process to confirm the lock" >> + _ <- recvMsg s' >> + return fd >> -- >> 2.6.0.rc2.230.g3dd15c0 >> >> > Hrvoje Ribicic > Ganeti Engineering > Google Germany GmbH > Dienerstr. 12, 80331, München > > Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle > Registergericht und -nummer: Hamburg, HRB 86891 > Sitz der Gesellschaft: Hamburg > > Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind, > leiten Sie diese bitte nicht weiter, informieren Sie den Absender und > löschen Sie die E-Mail und alle Anhänge. Vielen Dank. > > This e-mail is confidential. If you are not the right addressee please do > not forward it, please inform the sender, and please erase this e-mail > including any attachments. Thanks. > > > Hrvoje Ribicic Ganeti Engineering Google Germany GmbH Dienerstr. 12, 80331, München Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind, leiten Sie diese bitte nicht weiter, informieren Sie den Absender und löschen Sie die E-Mail und alle Anhänge. Vielen Dank. This e-mail is confidential. If you are not the right addressee please do not forward it, please inform the sender, and please erase this e-mail including any attachments. Thanks.
