Re: [Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files

2010-01-05 Thread Neil Brown

Hi,

Sorry for the slightly delayed reply -- I didn't have time to look 
through all your code and understand it until just now.  Your code has 
one (no doubt frustratingly!) small problem, which is in the deadlocking 
pipeline3:


Maciej Piechotka wrote:

pipeline3 :: CHP ()
pipeline3 = enrolling $ do
  file - oneToManyChannel' $ chanLabel File
  fileGZ - oneToOneChannel' $ chanLabel File GZ
  data_ - oneToManyChannel' $ chanLabel Data
  compressed - oneToManyChannel' $ chanLabel Data Compressed
  md5 - oneToOneChannel' $ chanLabel MD5
  md5Compressed - oneToOneChannel' $ chanLabel MD5 Compressed
  fileGZ' - Enroll (reader file)
  fileData - Enroll (reader file)
  dataMD5 - Enroll (reader data_)
  dataCompress - Enroll (reader data_)
  compressedFile - Enroll (reader compressed)
  compressedMD5 - Enroll (reader compressed)
  liftCHP $ runParallel_ [getFiles (writer file),
  (forever $ readChannel fileGZ' =
 writeChannel (writer fileGZ) . 
 (++.gz))

  `onPoisonRethrow`
  (poison fileGZ'  poison (writer fileGZ)),
  readFromFile fileData (writer data_),
  calculateMD5 dataMD5 (writer md5),
  compressCHP dataCompress
  (writer compressed),
  writeToFile (reader fileGZ) compressedFile,
  calculateMD5 compressedMD5
   (writer md5Compressed),
  forever $ readChannel dataMD5 =
liftIO . print 
readChannel compressedMD5 = 
liftIO . print]



Problems:

(CHP) Thread terminated with: thread blocked indefinitely in an STM
transaction
 _b3, _b4, File GZ.test1.gz 
  
Where you have readChannel dataMD5 and readChannel compressedMD5 in 
the last few lines, you actually meant to have readChannel (reader 
md5) and readChannel (reader md5Compressed).  Your mistake meant that 
the former two channels were being used more times in parallel than you 
had enrolled and that the latter two channels were being written to but 
not read from.  Either of these mistakes could cause deadlock, so hence 
why you were getting a strange deadlock.  Unfortunately, the type system 
didn't save you this time, because the channel types happened to be the 
same.  It took me a while to find it, too!


On a side note, it would be good to have a static check for these 
mistakes (using a channel in parallel unsafely, and only using one end 
of a channel), but the only way I found to use Haskell's type-system for 
this is a rather nasty type-indexed monad.   I guess if you use 
newChannelRW and name both the results, you would get an unused variable 
warning if you didn't use either end of the channel.  This would fix one 
issue, but not the other.


Hope that helps,

Neil.
___
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe


[Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files

2010-01-03 Thread Maciej Piechotka
I have following problem: I'd like to operate on big files so I'd
prefere to operate on 'stream' instead of whole file at a time to avoid
keeping too much in memory. I need to calculate MD5 and compress file. 

I tried to use something like that but I'm afraid that I'd need to patch
zlib package as it results in deadlock:

 {-# LANGUAGE GADTs #-}
 import Codec.Compression.GZip
 import Control.Applicative
 import Control.Concurrent.CHP
 import qualified Control.Concurrent.CHP.Common as CHP
 import Control.Concurrent.CHP.Enroll
 import Control.Concurrent.CHP.Utils
 import Control.Monad.State.Strict
 import Data.Digest.Pure.MD5
 import Data.Maybe
 import qualified Data.ByteString.Char8 as BS
 import qualified Data.ByteString.Lazy.Char8 as LBS
 import qualified Data.ByteString.Lazy.Internal as LBS
 import System.Environment
 import System.IO
 import System.IO.Unsafe
 
 
 calculateMD5 :: (ReadableChannel r,
  Poisonable (r (Maybe BS.ByteString)),
  WriteableChannel w,
  Poisonable (w MD5Digest))
  = r (Maybe BS.ByteString)
  - w MD5Digest
  - CHP ()
 calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext
`onPoisonRethrow` (poison in_  poison out)
where loop = liftCHP (readChannel in_) = 
 calc'
  calc' Nothing  = gets md5Finalize =
   liftCHP .
   writeChannel out 
   put md5InitialContext
  calc' (Just b) = modify (flip md5Update 
 $ LBS.fromChunks [b])

Calculate MD5 hash of input stream. Nothing indicates EOF.

 unsafeInterleaveCHP :: CHP a - CHP a
 unsafeInterleaveCHP = fromJust . liftIO =
   unsafeInterleaveIO . embedCHP

Helper function. It is suppose to move the execution in time - just as
unsafeInterleaveIO. I belive that the main problem lives here.

Especially that Maybe.fromJust: Nothing is the error.

 chan2List :: (ReadableChannel r, Poisonable (r a))
   = r a - CHP [a]
 chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_)
  (chan2List in_))
  `onPoisonTrap` return [])

Changes channel to lazy read list.

 chanMaybe2List :: (ReadableChannel r,
Poisonable (r (Maybe a)))
= r (Maybe a)
- CHP [[a]]
 chanMaybe2List in_ = splitByMaybe $ chan2List
  where splitByMaybe [] = []
splitByMaybe (Nothing:xs) =
  []:splitByMaybe xs
splitByMaybe (Just v :[]) = [[v]]
splitByMaybe (Just v :xs) =
  let (y:ys) = splitByMaybe xs
  in (v:y):ys

Reads lazyly from channel o list of list

 compressCHP :: (ReadableChannel r,
 Poisonable (r (Maybe BS.ByteString)),
 WriteableChannel w,
 Poisonable (w (Maybe BS.ByteString)))
 = r (Maybe BS.ByteString)
 - w (Maybe BS.ByteString)
 - CHP ()
 compressCHP in_ out = toOut = mapM_ sendBS
   where in_' :: CHP [LBS.ByteString]
 in_' = fmap LBS.fromChunks $ 
chanMaybe2List in_
 toOut :: CHP [LBS.ByteString]
 toOut = fmap compress $ in_'
 sendBS :: LBS.ByteString - CHP ()
 sendBS LBS.Empty   = writeChannel out
   Nothing
 sendBS (LBS.Chunk c r) = writeChannel out
  (Just c) 
   sendBS r

Compress process 

 readFromFile :: (ReadableChannel r,
  Poisonable (r String),
  WriteableChannel w,
  Poisonable (w (Maybe BS.ByteString)))
  = r String
  - w (Maybe BS.ByteString)
  - CHP ()
 readFromFile file data_ =
   forever (do path - readChannel file
   hnd - liftIO $ openFile path ReadMode
   let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) =
  writeChannel data_ . Just
   copy `onPoisonRethrow` liftIO (hClose hnd)
   writeChannel data_ Nothing
   liftIO $ hClose hnd)
   `onPoisonRethrow` (poison file  poison data_)

Process reading from file

 writeToFile :: (ReadableChannel r,
 Poisonable (r String),
 ReadableChannel r',
 Poisonable