Michael, On Tue, 2012-11-27 at 17:14 +0200, Michael Snoyman wrote: > I think the stm-conduit package[1] may be helpful for this use case. > Each time you get a new command, you can fork a thread and give it the > TBMChan to write to, and you can use sourceTBMChan to get a source to > send to the client.
That's +- what I had in mind. I did find stm-conduit before and did try to get the thing working using it, but these attempts failed. I attached an example which might clarify what I intend to do. I'm aware it contains several potential bugs (leaking threads etc), but that's beside the question ;-) If only I could figure out what to put on the 3 lines of comment I left in there... Thanks for your help, Nicolas
{-# LANGUAGE Rank2Types #-} module Main where import Data.Conduit import qualified Data.Conduit.List as CL import Data.Conduit.TMChan import Control.Applicative import Control.Concurrent (forkIO) import Control.Concurrent.STM (atomically) import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO, liftIO) data Command = Add Int Int | Disconnect deriving (Show) data Reply = Result Int deriving (Show) application :: MonadIO m => GConduit Int m String application = do -- Create input and output channels to/from worker threads (chanIn, chanOut) <- liftIO $ (,) <$> newTBMChanIO 10 <*> newTBMChanIO 10 -- Spawn some worker threads liftIO $ forM_ [0..5] $ \i -> forkIO $ processCommands i chanIn chanOut -- How to make -- sourceTBMChan chanOut -- something of which all produced values are yield'ed by this Conduit? loop chanIn where -- Loop retrieves one command from our source and pushes it to the -- worker threads input channel, then loops loop :: MonadIO m => TBMChan Command -> GConduit Int m String loop chan = do liftIO $ putStrLn "Enter loop" cmd <- getCommand liftIO $ do putStrLn $ "Got command: " ++ show cmd atomically $ writeTBMChan chan cmd case cmd of Disconnect -> return () _ -> loop chan -- getCommand fetches and parses a single command from our source getCommand :: Monad m => GSink Int m Command getCommand = do v <- await case v of Nothing -> return Disconnect Just i -> return $ Add i 1 -- processCommands reads commands from a given input channel, processes -- them, and pushes the result to a given output channel processCommands :: Int -> TBMChan Command -> TBMChan Reply -> IO () processCommands i chanIn chanOut = do putStrLn $ "Enter processCommands " ++ show i cmd <- atomically $ readTBMChan chanIn putStrLn $ show i ++ " read command: " ++ show cmd case cmd of Nothing -> return () Just (Add a b) -> do atomically $ writeTBMChan chanOut (Result (a + b)) putStrLn $ show i ++ " pushed result" processCommands i chanIn chanOut Just Disconnect -> return () main :: IO () main = do res <- CL.sourceList [1..20] $= application $$ CL.consume putStrLn $ "Result: " ++ show res
_______________________________________________ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe