On Fri, Nov 20, 2015 at 5:11 PM, 'Oleg Ponomarev' via ganeti-devel < [email protected]> wrote:
> forkJobProcess implementation consist of several steps. Move each > logical consistent step into the generalized function in order to reuse > Fix up nits pointed out in previous review. > these code fragments in forkPostHooksProcess (will be the next commit). > > Signed-off-by: Oleg Ponomarev <[email protected]> > --- > src/Ganeti/Query/Exec.hs | 184 > +++++++++++++++++++++++++++-------------------- > 1 file changed, 106 insertions(+), 78 deletions(-) > > diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs > index 79889ff..8a4b13f 100644 > --- a/src/Ganeti/Query/Exec.hs > +++ b/src/Ganeti/Query/Exec.hs > @@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable > filterReadable :: (Read a) => [String] -> [a] > filterReadable = mapMaybe (fmap fst . listToMaybe . reads) > > - > -- | Catches a potential `IOError` and sets its description via > -- `annotateIOError`. This makes exceptions more informative when they > -- are thrown from an unnamed `Handle`. > @@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a > rethrowAnnotateIOError desc = > modifyIOError (\e -> annotateIOError e desc Nothing Nothing) > > --- Code that is executed in a @fork@-ed process and that the replaces > iteself > --- with the actual job process > -runJobProcess :: JobId -> Client -> IO () > -runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $ > + > +-- | Code that is executed in a @fork@-ed process. Performs > communication with > +-- the parent process by calling commFn and then runs pyExecIO python > +-- executable. > +runProcess :: JobId -- ^ a job to process > + -> Client -- ^ UDS transport > + -> IO FilePath -- ^ path to the python executable > + -> ((String -> IO ()) -> JobId -> Client -> IO Fd) > + -- ^ pre-execution function communicating with the parent. > The > + -- function returns the file descriptor which should be > s/be// > + -- remain open > + -> IO () > +runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $ > do > -- Close the standard error to prevent anything being written there > -- (for example by exceptions when closing unneeded FDs). > @@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show > jid) $ > let logLater _ = return () > > logLater $ "Forking a new process for job " ++ show (fromJobId jid) > - > - -- Create a livelock file for the job > - (TOD ts _) <- getClockTime > - lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId jid) ts > - > - -- Lock the livelock file > - logLater $ "Locking livelock file " ++ show lockfile > - fd <- lockFile lockfile >>= annotateResult "Can't lock the livelock > file" > - logLater "Sending the lockfile name to the master process" > - sendMsg s lockfile > - > - logLater "Waiting for the master process to confirm the lock" > - _ <- recvMsg s > - > + preserve_fd <- commFn logLater jid s > -- close the client > logLater "Closing the client" > (clFdR, clFdW) <- clientToFd s > @@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt CRITICAL (show > jid) $ > closeFd clFdR > closeFd clFdW > > - fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase listOpenFds > + fds <- (filter (> 2) . filter (/= preserve_fd)) <$> toErrorBase > listOpenFds > logLater $ "Closing every superfluous file descriptor: " ++ show fds > mapM_ (tryIOError . closeFd) fds > > - -- the master process will send the job id and the livelock file name > - -- using the same protocol to the job process > - -- we pass the job id as the first argument to the process; > - -- while the process never uses it, it's very convenient when listing > - -- job processes > + -- The master process will send the job id and the livelock file name > + -- using the same protocol. We pass the job id as the first argument > + -- to the process. While the process never uses it, it's very > convenient > + -- when listing job processes. > use_debug <- isDebugMode > env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0") > . M.insert "PYTHONPATH" AC.versionedsharedir > . M.fromList) > `liftM` getEnvironment > - execPy <- P.jqueueExecutorPy > + execPy <- pyExecIO > logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy > ++ " with PYTHONPATH=" ++ AC.versionedsharedir > () <- executeFile AC.pythonPath True [execPy, show (fromJobId jid)] > @@ -224,6 +218,59 @@ forkWithPipe conf childAction = do > $ closeClient child > return (pid, master) > > +-- | Kill the process with the id provided. > +killProcessOnError :: (FromString e, Show e) > + => ProcessID -- ^ job process pid > + -> Client -- ^ UDS client connected to the master node > + -> (String -> ResultT e (WriterLogT IO) ()) > + -- ^ log function > + -> ResultT e (WriterLogT IO) () > +killProcessOnError pid master logFn = do > + logFn "Closing the pipe to the client" > + withErrorLogAt WARNING "Closing the communication pipe failed" > + (liftIO (closeClient master)) `orElse` return () > + killIfAlive [sigTERM, sigABRT, sigKILL] > + where killIfAlive [] = return () > + killIfAlive (sig : sigs) = do > + logFn "Getting the status of the process" > + status <- tryError . liftIO $ getProcessStatus False True pid > + case status of > + Left e -> logFn $ "Job process already gone: " ++ show e > + Right (Just s) -> logFn $ "Child process status: " ++ show s > + Right Nothing -> do > + logFn $ "Child process running, killing by " ++ show sig > + liftIO $ signalProcess sig pid > + unless (null sigs) $ do > + threadDelay 100000 -- wait for 0.1s and check again > + killIfAlive sigs > + > +-- | Forks current process and running runFn in the child and commFn in > the > Noticed this just now: s/running/runs/ > +-- parent. Due to a bug in GHC forking process, we want to retry if the > forked > +-- process fails to start. If it fails later on, the failure is handled by > +-- 'ResultT' and no retry is performed. > +forkProcessCatchErrors :: (Show e, FromString e) > + => (Client -> IO ()) > + -> (ProcessID -> String -> ResultT e (WriterLogT > IO) ()) > + -> (ProcessID -> Client > + -> ResultT e (WriterLogT IO) (FilePath, > ProcessID)) > + -> ResultT e IO (FilePath, ProcessID) > +forkProcessCatchErrors runFn logFn commFn = do > + -- Due to a bug in GHC forking process, we want to retry > + -- if the forked process fails to start. > + -- If it fails later on, the failure is handled by 'ResultT' > + -- and no retry is performed. > + let execWriterLogInside = ResultT . execWriterLogT . runResultT > + retryErrorN C.luxidRetryForkCount > + $ \tryNo -> execWriterLogInside $ do > + let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS > + when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS) > + > + (pid, master) <- liftIO $ forkWithPipe connectConfig runFn > + > + logFn pid "Forked a new process" > + flip catchError (\e -> killProcessOnError pid master (logFn pid) > + >> throwError e) $ commFn pid master > + > -- | Forks the job process and starts processing of the given job. > -- Returns the livelock of the job and its process ID. > forkJobProcess :: (FromString e, Show e) > @@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e) > -- and process id in the job file > -> ResultT e IO (FilePath, ProcessID) > forkJobProcess job luxiLivelock update = do > - let jidStr = show . fromJobId . qjId $ job > - > - -- Retrieve secret parameters if present > - let secretParams = encodeStrict . filterSecretParameters . qjOps $ job > > logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock > ++ " for job " ++ jidStr > update luxiLivelock > > - -- Due to a bug in GHC forking process, we want to retry, > - -- if the forked process fails to start. > - -- If it fails later on, the failure is handled by 'ResultT' > - -- and no retry is performed. > - let execWriterLogInside = ResultT . execWriterLogT . runResultT > - retryErrorN C.luxidRetryForkCount > - $ \tryNo -> execWriterLogInside $ do > - let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS > - when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS) > - > - (pid, master) <- liftIO $ forkWithPipe connectConfig (runJobProcess > - . qjId $ job) > - > - let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ > "] " > - logDebugJob = logDebug . (jobLogPrefix ++) > - > - logDebugJob "Forked a new process" > - > - let killIfAlive [] = return () > - killIfAlive (sig : sigs) = do > - logDebugJob "Getting the status of the process" > - status <- tryError . liftIO $ getProcessStatus False True pid > - case status of > - Left e -> logDebugJob $ "Job process already gone: " ++ show e > - Right (Just s) -> logDebugJob $ "Child process status: " ++ > show s > - Right Nothing -> do > - logDebugJob $ "Child process running, killing by " ++ > show sig > - liftIO $ signalProcess sig pid > - unless (null sigs) $ do > - threadDelay 100000 -- wait for 0.1s and check again > - killIfAlive sigs > - > - let onError = do > - logDebugJob "Closing the pipe to the client" > - withErrorLogAt WARNING "Closing the communication pipe failed" > - (liftIO (closeClient master)) `orElse` return () > - killIfAlive [sigTERM, sigABRT, sigKILL] > - > - flip catchError (\e -> onError >> throwError e) > - $ do > + forkProcessCatchErrors (childMain . qjId $ job) logDebugJob > + parentMain > + where > + -- Retrieve secret parameters if present > + secretParams = encodeStrict . filterSecretParameters . qjOps $ job > + jidStr = show . fromJobId . qjId $ job > + jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show pid ++ > "] " > + logDebugJob pid = logDebug . (jobLogPrefix pid ++) > + > + -- | Code performing communication with the child process. First, > receive > + -- the livelock, then send necessary parameters to the python child. > + parentMain pid master = do > let annotatedIO msg k = do > - logDebugJob msg > - liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k > + logDebugJob pid msg > + liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++ msg) k > let recv msg = annotatedIO msg (recvMsg master) > send msg x = annotatedIO msg (sendMsg master x) > > lockfile <- recv "Getting the lockfile of the client" > > - logDebugJob $ "Setting the lockfile to the final " ++ lockfile > + logDebugJob pid ("Setting the lockfile to the final " ++ lockfile) > toErrorBase $ update lockfile > send "Confirming the client it can start" "" > > -- from now on, we communicate with the job's Python process > - > To make myself more explicit since the previous review: is there a good reason to get rid of this whitespace? It partitions the communication into logical components and makes it more readable :) > _ <- recv "Waiting for the job to ask for the job id" > send "Writing job id to the client" jidStr > - > _ <- recv "Waiting for the job to ask for the lock file name" > send "Writing the lock file name to the client" lockfile > - > _ <- recv "Waiting for the job to ask for secret parameters" > send "Writing secret parameters to the client" secretParams > - > liftIO $ closeClient master > - > return (lockfile, pid) > + > + -- | Code performing communication with the parent process. During > + -- communication the livelock is created, locked and sent back > + -- to the parent. > + childMain jid s = runProcess jid s P.jqueueExecutorPy commFn > + where > + commFn logFn jid' s' = do > + -- Create a livelock file for the job > + (TOD ts _) <- getClockTime > + lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId > jid') ts > + -- Lock the livelock file > + _ <- logFn $ "Locking livelock file " ++ show lockfile > + fd <- lockFile lockfile >>= annotateResult "Can't lock the > livelock" > + _ <- logFn "Sending the lockfile name to the master process" > + sendMsg s' lockfile > + _ <- logFn "Waiting for the master process to confirm the lock" > + _ <- recvMsg s' > + return fd > -- > 2.6.0.rc2.230.g3dd15c0 > > Hrvoje Ribicic Ganeti Engineering Google Germany GmbH Dienerstr. 12, 80331, München Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind, leiten Sie diese bitte nicht weiter, informieren Sie den Absender und löschen Sie die E-Mail und alle Anhänge. Vielen Dank. This e-mail is confidential. If you are not the right addressee please do not forward it, please inform the sender, and please erase this e-mail including any attachments. Thanks.
