[Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files

2010-01-03 Thread Maciej Piechotka
I have following problem: I'd like to operate on big files so I'd
prefere to operate on 'stream' instead of whole file at a time to avoid
keeping too much in memory. I need to calculate MD5 and compress file. 

I tried to use something like that but I'm afraid that I'd need to patch
zlib package as it results in deadlock:

> {-# LANGUAGE GADTs #-}
> import Codec.Compression.GZip
> import Control.Applicative
> import Control.Concurrent.CHP
> import qualified Control.Concurrent.CHP.Common as CHP
> import Control.Concurrent.CHP.Enroll
> import Control.Concurrent.CHP.Utils
> import Control.Monad.State.Strict
> import Data.Digest.Pure.MD5
> import Data.Maybe
> import qualified Data.ByteString.Char8 as BS
> import qualified Data.ByteString.Lazy.Char8 as LBS
> import qualified Data.ByteString.Lazy.Internal as LBS
> import System.Environment
> import System.IO
> import System.IO.Unsafe
> 
> 
> calculateMD5 :: (ReadableChannel r,
>  Poisonable (r (Maybe BS.ByteString)),
>  WriteableChannel w,
>  Poisonable (w MD5Digest))
>  => r (Maybe BS.ByteString)
>  -> w MD5Digest
>  -> CHP ()
> calculateMD5 in_ out = evalStateT (forever loop) md5InitialContext
>`onPoisonRethrow` (poison in_ >> poison out)
>where loop = liftCHP (readChannel in_) >>= 
> calc'
>  calc' Nothing  = gets md5Finalize >>=
>   liftCHP .
>   writeChannel out >>
>   put md5InitialContext
>  calc' (Just b) = modify (flip md5Update 
> $ LBS.fromChunks [b])

Calculate MD5 hash of input stream. Nothing indicates EOF.

> unsafeInterleaveCHP :: CHP a -> CHP a
> unsafeInterleaveCHP = fromJust <.> liftIO <=<
>   unsafeInterleaveIO <.> embedCHP

Helper function. It is suppose to move the execution in time - just as
unsafeInterleaveIO. I belive that the main problem lives here.

Especially that Maybe.fromJust: Nothing is the error.

> chan2List :: (ReadableChannel r, Poisonable (r a))
>   => r a -> CHP [a]
> chan2List in_ = unsafeInterleaveCHP ((liftM2 (:) (readChannel in_)
>  (chan2List in_))
>  `onPoisonTrap` return [])

Changes channel to lazy read list.

> chanMaybe2List :: (ReadableChannel r,
>Poisonable (r (Maybe a)))
>=> r (Maybe a)
>-> CHP [[a]]
> chanMaybe2List in_ = splitByMaybe <$> chan2List
>  where splitByMaybe [] = []
>splitByMaybe (Nothing:xs) =
>  []:splitByMaybe xs
>splitByMaybe (Just v :[]) = [[v]]
>splitByMaybe (Just v :xs) =
>  let (y:ys) = splitByMaybe xs
>  in (v:y):ys

Reads lazyly from channel o list of list

> compressCHP :: (ReadableChannel r,
> Poisonable (r (Maybe BS.ByteString)),
> WriteableChannel w,
> Poisonable (w (Maybe BS.ByteString)))
> => r (Maybe BS.ByteString)
> -> w (Maybe BS.ByteString)
> -> CHP ()
> compressCHP in_ out = toOut >>= mapM_ sendBS
>   where in_' :: CHP [LBS.ByteString]
> in_' = fmap LBS.fromChunks <$> 
>chanMaybe2List in_
> toOut :: CHP [LBS.ByteString]
> toOut = fmap compress <$> in_'
> sendBS :: LBS.ByteString -> CHP ()
> sendBS LBS.Empty   = writeChannel out
>   Nothing
> sendBS (LBS.Chunk c r) = writeChannel out
>  (Just c) 
>  >> sendBS r

Compress process 

> readFromFile :: (ReadableChannel r,
>  Poisonable (r String),
>  WriteableChannel w,
>  Poisonable (w (Maybe BS.ByteString)))
>  => r String
>  -> w (Maybe BS.ByteString)
>  -> CHP ()
> readFromFile file data_ =
>   forever (do path <- readChannel file
>   hnd <- liftIO $ openFile path ReadMode
>   let copy = liftIO (BS.hGet hnd LBS.defaultChunkSize) >>=
>  writeChannel data_ . Just
>   copy `onPoisonRethrow` liftIO (hClose hnd)
>   writeChannel data_ Nothing
>   liftIO $ hClose hnd)
>   `onPoisonRethrow` (poison file >> poison data_)

Process reading from file

> write

Re: [Haskell-cafe] [Very long] (CHP?) Compressing, MD5 and big files

2010-01-05 Thread Neil Brown

Hi,

Sorry for the slightly delayed reply -- I didn't have time to look 
through all your code and understand it until just now.  Your code has 
one (no doubt frustratingly!) small problem, which is in the deadlocking 
pipeline3:


Maciej Piechotka wrote:

pipeline3 :: CHP ()
pipeline3 = enrolling $ do
  file <- oneToManyChannel' $ chanLabel "File"
  fileGZ <- oneToOneChannel' $ chanLabel "File GZ"
  data_ <- oneToManyChannel' $ chanLabel "Data"
  compressed <- oneToManyChannel' $ chanLabel "Data Compressed"
  md5 <- oneToOneChannel' $ chanLabel "MD5"
  md5Compressed <- oneToOneChannel' $ chanLabel "MD5 Compressed"
  fileGZ' <- Enroll (reader file)
  fileData <- Enroll (reader file)
  dataMD5 <- Enroll (reader data_)
  dataCompress <- Enroll (reader data_)
  compressedFile <- Enroll (reader compressed)
  compressedMD5 <- Enroll (reader compressed)
  liftCHP $ runParallel_ [getFiles (writer file),
  (forever $ readChannel fileGZ' >>=
 writeChannel (writer fileGZ) . 
 (++".gz"))

  `onPoisonRethrow`
  (poison fileGZ' >> poison (writer fileGZ)),
  readFromFile fileData (writer data_),
  calculateMD5 dataMD5 (writer md5),
  compressCHP dataCompress
  (writer compressed),
  writeToFile (reader fileGZ) compressedFile,
  calculateMD5 compressedMD5
   (writer md5Compressed),
  forever $ readChannel dataMD5 >>=
liftIO . print >>
readChannel compressedMD5 >>= 
liftIO . print]



Problems:

(CHP) Thread terminated with: thread blocked indefinitely in an STM
transaction
< _b3, _b4, File GZ."test1.gz" >
  
Where you have "readChannel dataMD5" and "readChannel compressedMD5" in 
the last few lines, you actually meant to have "readChannel (reader 
md5)" and "readChannel (reader md5Compressed)".  Your mistake meant that 
the former two channels were being used more times in parallel than you 
had enrolled and that the latter two channels were being written to but 
not read from.  Either of these mistakes could cause deadlock, so hence 
why you were getting a strange deadlock.  Unfortunately, the type system 
didn't save you this time, because the channel types happened to be the 
same.  It took me a while to find it, too!


On a side note, it would be good to have a static check for these 
mistakes (using a channel in parallel unsafely, and only using one end 
of a channel), but the only way I found to use Haskell's type-system for 
this is a rather nasty type-indexed monad.   I guess if you use 
newChannelRW and name both the results, you would get an unused variable 
warning if you didn't use either end of the channel.  This would fix one 
issue, but not the other.


Hope that helps,

Neil.
___
Haskell-Cafe mailing list
Haskell-Cafe@haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe