Re: [Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files
Hi, Sorry for the slightly delayed reply -- I didn't have time to look through all your code and understand it until just now. Your code has one (no doubt frustratingly!) small problem, which is in the deadlocking pipeline3: Maciej Piechotka wrote: pipeline3 :: CHP () pipeline3 = enrolling $ do file - oneToManyChannel' $ chanLabel File fileGZ - oneToOneChannel' $ chanLabel File GZ data_ - oneToManyChannel' $ chanLabel Data compressed - oneToManyChannel' $ chanLabel Data Compressed md5 - oneToOneChannel' $ chanLabel MD5 md5Compressed - oneToOneChannel' $ chanLabel MD5 Compressed fileGZ' - Enroll (reader file) fileData - Enroll (reader file) dataMD5 - Enroll (reader data_) dataCompress - Enroll (reader data_) compressedFile - Enroll (reader compressed) compressedMD5 - Enroll (reader compressed) liftCHP $ runParallel_ [getFiles (writer file), (forever $ readChannel fileGZ' = writeChannel (writer fileGZ) . (++.gz)) `onPoisonRethrow` (poison fileGZ' poison (writer fileGZ)), readFromFile fileData (writer data_), calculateMD5 dataMD5 (writer md5), compressCHP dataCompress (writer compressed), writeToFile (reader fileGZ) compressedFile, calculateMD5 compressedMD5 (writer md5Compressed), forever $ readChannel dataMD5 = liftIO . print readChannel compressedMD5 = liftIO . print] Problems: (CHP) Thread terminated with: thread blocked indefinitely in an STM transaction _b3, _b4, File GZ.test1.gz Where you have readChannel dataMD5 and readChannel compressedMD5 in the last few lines, you actually meant to have readChannel (reader md5) and readChannel (reader md5Compressed). Your mistake meant that the former two channels were being used more times in parallel than you had enrolled and that the latter two channels were being written to but not read from. Either of these mistakes could cause deadlock, so hence why you were getting a strange deadlock. Unfortunately, the type system didn't save you this time, because the channel types happened to be the same. It took me a while to find it, too! On a side note, it would be good to have a static check for these mistakes (using a channel in parallel unsafely, and only using one end of a channel), but the only way I found to use Haskell's type-system for this is a rather nasty type-indexed monad. I guess if you use newChannelRW and name both the results, you would get an unused variable warning if you didn't use either end of the channel. This would fix one issue, but not the other. Hope that helps, Neil. ___ Haskell-Cafe mailing list Haskell-Cafe@haskell.org http://www.haskell.org/mailman/listinfo/haskell-cafe
[Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files
I have following problem: I'd like to operate on big files so I'd prefere to operate on 'stream' instead of whole file at a time to avoid keeping too much in memory. I need to calculate MD5 and compress file. I tried to use something like that but I'm afraid that I'd need to patch zlib package as it results in deadlock: {-# LANGUAGE GADTs #-} import Codec.Compression.GZip import Control.Applicative import Control.Concurrent.CHP import qualified Control.Concurrent.CHP.Common as CHP import Control.Concurrent.CHP.Enroll import Control.Concurrent.CHP.Utils import Control.Monad.State.Strict import Data.Digest.Pure.MD5 import Data.Maybe import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Lazy.Char8 as LBS import qualified Data.ByteString.Lazy.Internal as LBS import System.Environment import System.IO import System.IO.Unsafe calculateMD5 :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w MD5Digest)) = r (Maybe BS.ByteString) - w MD5Digest - CHP () calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext `onPoisonRethrow` (poison in_ poison out) where loop = liftCHP (readChannel in_) = calc' calc' Nothing = gets md5Finalize = liftCHP . writeChannel out put md5InitialContext calc' (Just b) = modify (flip md5Update $ LBS.fromChunks [b]) Calculate MD5 hash of input stream. Nothing indicates EOF. unsafeInterleaveCHP :: CHP a - CHP a unsafeInterleaveCHP = fromJust . liftIO = unsafeInterleaveIO . embedCHP Helper function. It is suppose to move the execution in time - just as unsafeInterleaveIO. I belive that the main problem lives here. Especially that Maybe.fromJust: Nothing is the error. chan2List :: (ReadableChannel r, Poisonable (r a)) = r a - CHP [a] chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_) (chan2List in_)) `onPoisonTrap` return []) Changes channel to lazy read list. chanMaybe2List :: (ReadableChannel r, Poisonable (r (Maybe a))) = r (Maybe a) - CHP [[a]] chanMaybe2List in_ = splitByMaybe $ chan2List where splitByMaybe [] = [] splitByMaybe (Nothing:xs) = []:splitByMaybe xs splitByMaybe (Just v :[]) = [[v]] splitByMaybe (Just v :xs) = let (y:ys) = splitByMaybe xs in (v:y):ys Reads lazyly from channel o list of list compressCHP :: (ReadableChannel r, Poisonable (r (Maybe BS.ByteString)), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) = r (Maybe BS.ByteString) - w (Maybe BS.ByteString) - CHP () compressCHP in_ out = toOut = mapM_ sendBS where in_' :: CHP [LBS.ByteString] in_' = fmap LBS.fromChunks $ chanMaybe2List in_ toOut :: CHP [LBS.ByteString] toOut = fmap compress $ in_' sendBS :: LBS.ByteString - CHP () sendBS LBS.Empty = writeChannel out Nothing sendBS (LBS.Chunk c r) = writeChannel out (Just c) sendBS r Compress process readFromFile :: (ReadableChannel r, Poisonable (r String), WriteableChannel w, Poisonable (w (Maybe BS.ByteString))) = r String - w (Maybe BS.ByteString) - CHP () readFromFile file data_ = forever (do path - readChannel file hnd - liftIO $ openFile path ReadMode let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) = writeChannel data_ . Just copy `onPoisonRethrow` liftIO (hClose hnd) writeChannel data_ Nothing liftIO $ hClose hnd) `onPoisonRethrow` (poison file poison data_) Process reading from file writeToFile :: (ReadableChannel r, Poisonable (r String), ReadableChannel r', Poisonable