What do you thing about this interdiff? (commit message is also fixed)
--- a/src/Ganeti/Query/Exec.hs
+++ b/src/Ganeti/Query/Exec.hs
@@ -136,7 +136,7 @@ runProcess :: JobId -- ^ a job to process
-> IO FilePath -- ^ path to the python executable
-> ((String -> IO ()) -> JobId -> Client -> IO Fd)
-- ^ pre-execution function communicating with the
parent. The
- -- function returns the file descriptor which should be
+ -- function returns the file descriptor which should
-- remain open
-> IO ()
runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show jid) $
@@ -244,7 +244,7 @@ killProcessOnError pid master logFn = do
threadDelay 100000 -- wait for 0.1s and check again
killIfAlive sigs
--- | Forks current process and running runFn in the child and commFn in the
+-- | Forks current process and runs runFn in the child and commFn in the
-- parent. Due to a bug in GHC forking process, we want to retry if
the forked
-- process fails to start. If it fails later on, the failure is handled by
-- 'ResultT' and no retry is performed.
@@ -311,6 +311,7 @@ forkJobProcess job luxiLivelock update = do
send "Confirming the client it can start" ""
-- from now on, we communicate with the job's Python process
+
_ <- recv "Waiting for the job to ask for the job id"
send "Writing job id to the client" jidStr
_ <- recv "Waiting for the job to ask for the lock file name"
On 11/24/2015 08:24 PM, Hrvoje Ribicic wrote:
On Fri, Nov 20, 2015 at 5:11 PM, 'Oleg Ponomarev' via ganeti-devel
<[email protected] <mailto:[email protected]>>
wrote:
forkJobProcess implementation consist of several steps. Move each
logical consistent step into the generalized function in order to
reuse
Fix up nits pointed out in previous review.
these code fragments in forkPostHooksProcess (will be the next
commit).
Signed-off-by: Oleg Ponomarev <[email protected]
<mailto:[email protected]>>
---
src/Ganeti/Query/Exec.hs | 184
+++++++++++++++++++++++++++--------------------
1 file changed, 106 insertions(+), 78 deletions(-)
diff --git a/src/Ganeti/Query/Exec.hs b/src/Ganeti/Query/Exec.hs
index 79889ff..8a4b13f 100644
--- a/src/Ganeti/Query/Exec.hs
+++ b/src/Ganeti/Query/Exec.hs
@@ -120,7 +120,6 @@ listOpenFds = liftM filterReadable
filterReadable :: (Read a) => [String] -> [a]
filterReadable = mapMaybe (fmap fst . listToMaybe . reads)
-
-- | Catches a potential `IOError` and sets its description via
-- `annotateIOError`. This makes exceptions more informative when
they
-- are thrown from an unnamed `Handle`.
@@ -128,10 +127,19 @@ rethrowAnnotateIOError :: String -> IO a -> IO a
rethrowAnnotateIOError desc =
modifyIOError (\e -> annotateIOError e desc Nothing Nothing)
--- Code that is executed in a @fork@-ed process and that the
replaces iteself
--- with the actual job process
-runJobProcess :: JobId -> Client -> IO ()
-runJobProcess jid s = withErrorLogAt CRITICAL (show jid) $
+
+-- | Code that is executed in a @fork@-ed process. Performs
communication with
+-- the parent process by calling commFn and then runs pyExecIO python
+-- executable.
+runProcess :: JobId -- ^ a job to process
+ -> Client -- ^ UDS transport
+ -> IO FilePath -- ^ path to the python executable
+ -> ((String -> IO ()) -> JobId -> Client -> IO Fd)
+ -- ^ pre-execution function communicating with the
parent. The
+ -- function returns the file descriptor which should be
s/be//
+ -- remain open
+ -> IO ()
+runProcess jid s pyExecIO commFn = withErrorLogAt CRITICAL (show
jid) $
do
-- Close the standard error to prevent anything being written
there
-- (for example by exceptions when closing unneeded FDs).
@@ -144,20 +152,7 @@ runJobProcess jid s = withErrorLogAt CRITICAL
(show jid) $
let logLater _ = return ()
logLater $ "Forking a new process for job " ++ show
(fromJobId jid)
-
- -- Create a livelock file for the job
- (TOD ts _) <- getClockTime
- lockfile <- P.livelockFile $ printf "job_%06d_%d" (fromJobId
jid) ts
-
- -- Lock the livelock file
- logLater $ "Locking livelock file " ++ show lockfile
- fd <- lockFile lockfile >>= annotateResult "Can't lock the
livelock file"
- logLater "Sending the lockfile name to the master process"
- sendMsg s lockfile
-
- logLater "Waiting for the master process to confirm the lock"
- _ <- recvMsg s
-
+ preserve_fd <- commFn logLater jid s
-- close the client
logLater "Closing the client"
(clFdR, clFdW) <- clientToFd s
@@ -171,21 +166,20 @@ runJobProcess jid s = withErrorLogAt
CRITICAL (show jid) $
closeFd clFdR
closeFd clFdW
- fds <- (filter (> 2) . filter (/= fd)) <$> toErrorBase
listOpenFds
+ fds <- (filter (> 2) . filter (/= preserve_fd)) <$>
toErrorBase listOpenFds
logLater $ "Closing every superfluous file descriptor: " ++
show fds
mapM_ (tryIOError . closeFd) fds
- -- the master process will send the job id and the livelock
file name
- -- using the same protocol to the job process
- -- we pass the job id as the first argument to the process;
- -- while the process never uses it, it's very convenient when
listing
- -- job processes
+ -- The master process will send the job id and the livelock
file name
+ -- using the same protocol. We pass the job id as the first
argument
+ -- to the process. While the process never uses it, it's very
convenient
+ -- when listing job processes.
use_debug <- isDebugMode
env <- (M.insert "GNT_DEBUG" (if use_debug then "1" else "0")
. M.insert "PYTHONPATH" AC.versionedsharedir
. M.fromList)
`liftM` getEnvironment
- execPy <- P.jqueueExecutorPy
+ execPy <- pyExecIO
logLater $ "Executing " ++ AC.pythonPath ++ " " ++ execPy
++ " with PYTHONPATH=" ++ AC.versionedsharedir
() <- executeFile AC.pythonPath True [execPy, show (fromJobId
jid)]
@@ -224,6 +218,59 @@ forkWithPipe conf childAction = do
$ closeClient child
return (pid, master)
+-- | Kill the process with the id provided.
+killProcessOnError :: (FromString e, Show e)
+ => ProcessID -- ^ job process pid
+ -> Client -- ^ UDS client connected to the
master node
+ -> (String -> ResultT e (WriterLogT IO) ())
+ -- ^ log function
+ -> ResultT e (WriterLogT IO) ()
+killProcessOnError pid master logFn = do
+ logFn "Closing the pipe to the client"
+ withErrorLogAt WARNING "Closing the communication pipe failed"
+ (liftIO (closeClient master)) `orElse` return ()
+ killIfAlive [sigTERM, sigABRT, sigKILL]
+ where killIfAlive [] = return ()
+ killIfAlive (sig : sigs) = do
+ logFn "Getting the status of the process"
+ status <- tryError . liftIO $ getProcessStatus False
True pid
+ case status of
+ Left e -> logFn $ "Job process already gone: " ++ show e
+ Right (Just s) -> logFn $ "Child process status: " ++
show s
+ Right Nothing -> do
+ logFn $ "Child process running, killing by " ++
show sig
+ liftIO $ signalProcess sig pid
+ unless (null sigs) $ do
+ threadDelay 100000 -- wait for 0.1s and check again
+ killIfAlive sigs
+
+-- | Forks current process and running runFn in the child and
commFn in the
Noticed this just now: s/running/runs/
+-- parent. Due to a bug in GHC forking process, we want to retry
if the forked
+-- process fails to start. If it fails later on, the failure is
handled by
+-- 'ResultT' and no retry is performed.
+forkProcessCatchErrors :: (Show e, FromString e)
+ => (Client -> IO ())
+ -> (ProcessID -> String -> ResultT e
(WriterLogT IO) ())
+ -> (ProcessID -> Client
+ -> ResultT e (WriterLogT IO)
(FilePath, ProcessID))
+ -> ResultT e IO (FilePath, ProcessID)
+forkProcessCatchErrors runFn logFn commFn = do
+ -- Due to a bug in GHC forking process, we want to retry
+ -- if the forked process fails to start.
+ -- If it fails later on, the failure is handled by 'ResultT'
+ -- and no retry is performed.
+ let execWriterLogInside = ResultT . execWriterLogT . runResultT
+ retryErrorN C.luxidRetryForkCount
+ $ \tryNo -> execWriterLogInside $ do
+ let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
+ when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
+
+ (pid, master) <- liftIO $ forkWithPipe connectConfig runFn
+
+ logFn pid "Forked a new process"
+ flip catchError (\e -> killProcessOnError pid master (logFn pid)
+ >> throwError e) $ commFn pid master
+
-- | Forks the job process and starts processing of the given job.
-- Returns the livelock of the job and its process ID.
forkJobProcess :: (FromString e, Show e)
@@ -234,78 +281,59 @@ forkJobProcess :: (FromString e, Show e)
-- and process id in the job file
-> ResultT e IO (FilePath, ProcessID)
forkJobProcess job luxiLivelock update = do
- let jidStr = show . fromJobId . qjId $ job
-
- -- Retrieve secret parameters if present
- let secretParams = encodeStrict . filterSecretParameters .
qjOps $ job
logDebug $ "Setting the lockfile temporarily to " ++ luxiLivelock
++ " for job " ++ jidStr
update luxiLivelock
- -- Due to a bug in GHC forking process, we want to retry,
- -- if the forked process fails to start.
- -- If it fails later on, the failure is handled by 'ResultT'
- -- and no retry is performed.
- let execWriterLogInside = ResultT . execWriterLogT . runResultT
- retryErrorN C.luxidRetryForkCount
- $ \tryNo -> execWriterLogInside $ do
- let maxWaitUS = 2^(tryNo - 1) * C.luxidRetryForkStepUS
- when (tryNo >= 2) . liftIO $ delayRandom (0, maxWaitUS)
-
- (pid, master) <- liftIO $ forkWithPipe connectConfig
(runJobProcess
- . qjId $ job)
-
- let jobLogPrefix = "[start:job-" ++ jidStr ++ ",pid=" ++ show
pid ++ "] "
- logDebugJob = logDebug . (jobLogPrefix ++)
-
- logDebugJob "Forked a new process"
-
- let killIfAlive [] = return ()
- killIfAlive (sig : sigs) = do
- logDebugJob "Getting the status of the process"
- status <- tryError . liftIO $ getProcessStatus False
True pid
- case status of
- Left e -> logDebugJob $ "Job process already gone: "
++ show e
- Right (Just s) -> logDebugJob $ "Child process
status: " ++ show s
- Right Nothing -> do
- logDebugJob $ "Child process running, killing by
" ++ show sig
- liftIO $ signalProcess sig pid
- unless (null sigs) $ do
- threadDelay 100000 -- wait for 0.1s and check again
- killIfAlive sigs
-
- let onError = do
- logDebugJob "Closing the pipe to the client"
- withErrorLogAt WARNING "Closing the communication pipe
failed"
- (liftIO (closeClient master)) `orElse` return ()
- killIfAlive [sigTERM, sigABRT, sigKILL]
-
- flip catchError (\e -> onError >> throwError e)
- $ do
+ forkProcessCatchErrors (childMain . qjId $ job) logDebugJob
+ parentMain
+ where
+ -- Retrieve secret parameters if present
+ secretParams = encodeStrict . filterSecretParameters . qjOps
$ job
+ jidStr = show . fromJobId . qjId $ job
+ jobLogPrefix pid = "[start:job-" ++ jidStr ++ ",pid=" ++ show
pid ++ "] "
+ logDebugJob pid = logDebug . (jobLogPrefix pid ++)
+
+ -- | Code performing communication with the child process.
First, receive
+ -- the livelock, then send necessary parameters to the python
child.
+ parentMain pid master = do
let annotatedIO msg k = do
- logDebugJob msg
- liftIO $ rethrowAnnotateIOError (jobLogPrefix ++ msg) k
+ logDebugJob pid msg
+ liftIO $ rethrowAnnotateIOError (jobLogPrefix pid ++
msg) k
let recv msg = annotatedIO msg (recvMsg master)
send msg x = annotatedIO msg (sendMsg master x)
lockfile <- recv "Getting the lockfile of the client"
- logDebugJob $ "Setting the lockfile to the final " ++ lockfile
+ logDebugJob pid ("Setting the lockfile to the final " ++
lockfile)
toErrorBase $ update lockfile
send "Confirming the client it can start" ""
-- from now on, we communicate with the job's Python process
-
To make myself more explicit since the previous review: is there a
good reason to get rid of this whitespace?
It partitions the communication into logical components and makes it
more readable :)
_ <- recv "Waiting for the job to ask for the job id"
send "Writing job id to the client" jidStr
-
_ <- recv "Waiting for the job to ask for the lock file name"
send "Writing the lock file name to the client" lockfile
-
_ <- recv "Waiting for the job to ask for secret parameters"
send "Writing secret parameters to the client" secretParams
-
liftIO $ closeClient master
-
return (lockfile, pid)
+
+ -- | Code performing communication with the parent process.
During
+ -- communication the livelock is created, locked and sent back
+ -- to the parent.
+ childMain jid s = runProcess jid s P.jqueueExecutorPy commFn
+ where
+ commFn logFn jid' s' = do
+ -- Create a livelock file for the job
+ (TOD ts _) <- getClockTime
+ lockfile <- P.livelockFile $ printf "job_%06d_%d"
(fromJobId jid') ts
+ -- Lock the livelock file
+ _ <- logFn $ "Locking livelock file " ++ show lockfile
+ fd <- lockFile lockfile >>= annotateResult "Can't lock
the livelock"
+ _ <- logFn "Sending the lockfile name to the master
process"
+ sendMsg s' lockfile
+ _ <- logFn "Waiting for the master process to confirm
the lock"
+ _ <- recvMsg s'
+ return fd
--
2.6.0.rc2.230.g3dd15c0
Hrvoje Ribicic
Ganeti Engineering
Google Germany GmbH
Dienerstr. 12, 80331, München
Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle
Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat
sind, leiten Sie diese bitte nicht weiter, informieren Sie den
Absender und löschen Sie die E-Mail und alle Anhänge. Vielen Dank.
This e-mail is confidential. If you are not the right addressee please
do not forward it, please inform the sender, and please erase this
e-mail including any attachments. Thanks.