Your code is a little buggy/funky and I think this is why you are having 
problems.

The major problem is that you are not dispatching work across your 
goroutines properly.

Every one of your "processFromChannel" goroutines competes to read from one 
"chan string", each using a for-range loop.  It's totally possible that one 
of these goroutines reads every file path that you send and the others 
don't read anything.  No parallelism there!

Here's a new way of looking at the problem: a Goroutine is very lightweight 
and you can make tens of thousands of them without much overhead. 
 Therefore, you do not need a one-to-one correspondence between thread and 
Goroutine as seen here.

for i := 0; i < threads; i++  { 
                go processFromChannel(filesChannel, ackChannel, output) 
        } 

Instead, create a goroutine for each input path.  Don't bother making the 
filesChannel: it's much more predictable to focus on one file per 
goroutine.  You also don't need the ackChannel, instead use a WaitGroup.

// Code is untested but you get the gist.
wg := sync.WaitGroup{}

for _, f := range files {
   wg.Add(1)

   // It's important to grab the file away from the loop scope.
   go func(file string){
       processThing(file)

       wg.Done()
   }(f)
}

wg.Wait()

Now that you have dispatched your work in a predictable manner, you can 
fulfil your desire to limit the number of goRoutines which are executing 
simultaneously (ala your thread variable), which is in fact an orthogonal 
concern.   This falls in the camp of "shared access to limited resource", 
and thankfully there is a standard Computer Science solution: use a 
semaphore.  

The easiest way to use Semaphores in Go is via a buffered channel.  This is 
described here in more depth than I should write in an email at 
http://www.golangpatterns.info/concurrency/semaphores

If you'd like me to look at some more Go code for you, send me email and we 
can arrange something! :)

John Morrice
j...@functorama.com

On Saturday, October 29, 2016 at 8:42:02 PM UTC+1, Florian Weimer wrote:
>
> I'm trying to parse OpenPGP key server dumps.  Throughput is not too 
> bad so far, but I'd like to speed things up by introducing 
> parallelism. 
>
> The dumps are split into several files (each about 40 MB large, with a 
> few thousand keys).  The toy version I have so far does not need any 
> coordination between the processing of individual files 
>
> This works quite well when running the sequential version in parallel, 
> using GNU parallel.  With GOMAXPROCS=1, serial execution time is 
> reduced from 141 seconds to 31 seconds.  This is roughly what I would 
> expect from six cores with hyperthreading and use of GNU parallel 
> (which has some overhead of its own in this scenario). 
>
> However, the version with built-in parallelism runs in 55 seconds, so 
> only half as fast as the GNU parallel approach.  I would have expect 
> it to fare better compared to that.  The parallel version is my second 
> one which has decent performance.  I also tried a variant which has a 
> per-thread buffer which is occasionally written to standard output, 
> synchronized by a sync.Mutex.  (It would likely have benefited from a 
> sync.Mutex.TryLock() function, delaying the buffer flush if there was 
> contention.)  This was still significantly slower than the external 
> parallelization, but perhaps a little faster than parallel version 
> attached below.  I think both parallel approaches produce more garbage 
> than the sequential version, but likely not as much to explain the 
> speed difference compared to the sequential version with external 
> parallelization. 
>
> “perf top” suggests that a lot of time is spent in GC-related Go 
> functions (which is expected, considering what the program does).  But 
> vmstat shows a large number of context switches, which I find 
> surprising.  It is much higher than the number of context switching 
> during a GNU parallel run. 
>
> Most tests were run with the current master branch. 
>
> Is there anything else I could try to make the internally parallized 
> version as fast the externally parallized one? 
>
> package main 
>
> import ( 
>         "bytes" 
>         "bufio" 
>         "flag" 
>         "fmt" 
>         "io" 
>         openpgp "golang.org/x/crypto/openpgp/packet" 
>         "os" 
>         "reflect" 
>         "runtime" 
>         "sync" 
> ) 
>
> var verbose bool 
>
> var outputLock sync.Mutex 
>
> // Print the string to standard output, with optional synchronization. 
> type Output interface { 
>         Print(format string, args ...interface{}) 
> } 
>
> func packetError(path string, packet openpgp.OpaquePacket, err error) { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
> } 
>
> func printPacketType(packet openpgp.Packet) { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
> } 
>
> // A printer for OpenPGP user IDs which writes multiple key IDs en 
> // bloc, to avoid lock contention. 
> type uidPrinter struct { 
>         buffer bytes.Buffer 
>         count int 
> } 
>
> // Print one user ID.  The user ID might not be printed until Flush() 
> // is called. 
> func (p *uidPrinter) Print(uid *openpgp.UserId) { 
>         fmt.Fprintf(&p.buffer, "uid: %#v\n", uid.Id) 
>         p.count++ 
>         // Prevent the buffer from becoming too large. 
>         if p.count > 1000 { 
>                 p.Flush() 
>         } 
> } 
>
> // Print all the staged key IDs. 
> func (p *uidPrinter) Flush() { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
>         os.Stdout.Write(p.buffer.Bytes()) 
>         p.buffer.Truncate(0) 
>         p.count = 0 
> } 
>
> func printError(format string, args ...interface{}) { 
>         outputLock.Lock() 
>         defer outputLock.Unlock() 
>         fmt.Fprintf(os.Stderr, format, args...) 
> } 
>
> func readFile(path string, output Output) error { 
>         file, err := os.Open(path) 
>         if err != nil { 
>                 return err 
>         } 
>         defer file.Close() 
>         buf := bufio.NewReader(file) 
>         packets := openpgp.NewOpaqueReader(buf) 
>         for { 
>                 op, err := packets.Next() 
>                 if err != nil { 
>                         if (err == io.EOF) { 
>                                 break 
>                         } 
>                         return err 
>                 } 
>                 p, err := op.Parse() 
>                 if err != nil { 
>                         continue 
>                 } 
>                 if verbose { 
>                         output.Print("%s\n", reflect.TypeOf(p).String()) 
>                 } 
>                 if uid, ok := p.(*openpgp.UserId); ok { 
>                         output.Print("uid: %#v\n", uid.Id) 
>                 } 
>         } 
>         return nil 
> } 
>
> // Buffered output to standard output, without synchronization. 
> type sequentialOutput struct { 
>         buffer *bufio.Writer 
> } 
>
> func newSequentialOutput() *sequentialOutput { 
>         return &sequentialOutput{bufio.NewWriter(os.Stdout)} 
> } 
>
> func (p *sequentialOutput) Print(format string, args ...interface{}) { 
>         fmt.Fprintf(p.buffer, format, args...) 
> } 
>
> func (p *sequentialOutput) Stop() { 
>         p.buffer.Flush() 
> } 
>
> func processSequential(files []string) { 
>         output := newSequentialOutput() 
>         for _, path := range files { 
>                 err := readFile(path, output) 
>                 if err != nil { 
>                         printError("%s: error: %s\n", path, err.Error()) 
>                 } 
>         } 
>         output.Stop() 
> } 
>
> // Return a channel a closed which contains the given path strings. 
> func channelOfPaths(paths []string) chan string { 
>         ch := make(chan string, len(paths)) 
>         for _, path := range paths { 
>                 ch <- path 
>         } 
>         close(ch) 
>         return ch 
> } 
>
> // Spawned as a goroutine to process files from a channel. 
> func processFromChannel(filesChannel chan string, ackChannel chan 
> struct{}, 
>         output Output) { 
>         for path := range filesChannel { 
>                 err := readFile(path, output) 
>                 if err != nil { 
>                         printError("%s: error: %s\n", path, err.Error()) 
>                 } 
>                 ackChannel <- struct{}{} 
>         } 
> } 
>
> // Parallel output to standard output, with synchronization to prevent 
> // interleaving. 
> type parallelOutput struct { 
>         dataChannel chan string        // Strings to print, "" means 
> termiantion. 
>         ackChannel chan struct{} // Signals completed termination request. 
> } 
>
> func newParallelOutput() *parallelOutput { 
>         p := &parallelOutput{ 
>                 dataChannel: make(chan string, 5000), 
>                 ackChannel: make(chan struct{})} 
>         go parallelOutputGoroutine(p) 
>         return p 
> } 
>
>
> func parallelOutputGoroutine(p *parallelOutput) { 
>         var buf = bufio.NewWriter(os.Stdout) 
>         for s := range p.dataChannel { 
>                 if s == "" { 
>                         break 
>                 } 
>                 buf.WriteString(s) 
>         } 
>         buf.Flush() 
>         // Tell Stop() that processing is complete. 
>         close(p.ackChannel) 
> } 
>
> func (p *parallelOutput) Print(format string, args ...interface{}) { 
>         s := fmt.Sprintf(format, args...) 
>         if s != "" { 
>                 p.dataChannel <- s 
>         } 
> } 
>
> func (p *parallelOutput) Stop() { 
>         // Request termination. 
>         p.dataChannel <- "" 
>         // Wait for completion of all pending requests. 
>         _, _ = <- p.ackChannel 
> } 
>
> // Number of parallel threads, controlled by the -threads flag. 
> var threads int 
>
> // Process the files in parallel. 
> func processParallel(files []string) { 
>         if len(files) == 0 { 
>                 return 
>         } 
>
>         filesChannel := channelOfPaths(files) 
>
>         // This channel is used to detect termination. 
>         ackChannel := make(chan struct{}) 
>
>         fmt.Fprintf(os.Stderr, "info: threads: %d\n", threads) 
>
>         output := newParallelOutput() 
>         for i := 0; i < threads; i++  { 
>                 go processFromChannel(filesChannel, ackChannel, output) 
>         } 
>
>         // Wait for termination of the conversion. 
>         acks := 0 
>         for _ = range ackChannel { 
>                 acks++ 
>                 if acks == len(files) { 
>                         break 
>                 } 
>         } 
>
>         // Wait until all pending entries have been printed. 
>         output.Stop() 
> } 
>
> func main() { 
>         var sequential bool 
>         flag.BoolVar(&verbose, "verbose", false, "more verbose output") 
>         flag.BoolVar(&sequential, "sequential", false, "disable 
> concurrency") 
>         flag.IntVar(&threads, "threads", runtime.GOMAXPROCS(0), 
>                 "number of processing threads") 
>         flag.Parse() 
>          
>         files := flag.Args() 
>         if (sequential) { 
>                 processSequential(files) 
>         } else { 
>                 processParallel(files) 
>         } 
> } 
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to