Tidy solution, removing the songch, doh https://play.golang.org/p/2VppDS0U27
On Saturday, October 29, 2016 at 8:42:02 PM UTC+1, Florian Weimer wrote: > > I'm trying to parse OpenPGP key server dumps. Throughput is not too > bad so far, but I'd like to speed things up by introducing > parallelism. > > The dumps are split into several files (each about 40 MB large, with a > few thousand keys). The toy version I have so far does not need any > coordination between the processing of individual files > > This works quite well when running the sequential version in parallel, > using GNU parallel. With GOMAXPROCS=1, serial execution time is > reduced from 141 seconds to 31 seconds. This is roughly what I would > expect from six cores with hyperthreading and use of GNU parallel > (which has some overhead of its own in this scenario). > > However, the version with built-in parallelism runs in 55 seconds, so > only half as fast as the GNU parallel approach. I would have expect > it to fare better compared to that. The parallel version is my second > one which has decent performance. I also tried a variant which has a > per-thread buffer which is occasionally written to standard output, > synchronized by a sync.Mutex. (It would likely have benefited from a > sync.Mutex.TryLock() function, delaying the buffer flush if there was > contention.) This was still significantly slower than the external > parallelization, but perhaps a little faster than parallel version > attached below. I think both parallel approaches produce more garbage > than the sequential version, but likely not as much to explain the > speed difference compared to the sequential version with external > parallelization. > > “perf top” suggests that a lot of time is spent in GC-related Go > functions (which is expected, considering what the program does). But > vmstat shows a large number of context switches, which I find > surprising. It is much higher than the number of context switching > during a GNU parallel run. > > Most tests were run with the current master branch. > > Is there anything else I could try to make the internally parallized > version as fast the externally parallized one? > > package main > > import ( > "bytes" > "bufio" > "flag" > "fmt" > "io" > openpgp "golang.org/x/crypto/openpgp/packet" > "os" > "reflect" > "runtime" > "sync" > ) > > var verbose bool > > var outputLock sync.Mutex > > // Print the string to standard output, with optional synchronization. > type Output interface { > Print(format string, args ...interface{}) > } > > func packetError(path string, packet openpgp.OpaquePacket, err error) { > outputLock.Lock() > defer outputLock.Unlock() > } > > func printPacketType(packet openpgp.Packet) { > outputLock.Lock() > defer outputLock.Unlock() > } > > // A printer for OpenPGP user IDs which writes multiple key IDs en > // bloc, to avoid lock contention. > type uidPrinter struct { > buffer bytes.Buffer > count int > } > > // Print one user ID. The user ID might not be printed until Flush() > // is called. > func (p *uidPrinter) Print(uid *openpgp.UserId) { > fmt.Fprintf(&p.buffer, "uid: %#v\n", uid.Id) > p.count++ > // Prevent the buffer from becoming too large. > if p.count > 1000 { > p.Flush() > } > } > > // Print all the staged key IDs. > func (p *uidPrinter) Flush() { > outputLock.Lock() > defer outputLock.Unlock() > os.Stdout.Write(p.buffer.Bytes()) > p.buffer.Truncate(0) > p.count = 0 > } > > func printError(format string, args ...interface{}) { > outputLock.Lock() > defer outputLock.Unlock() > fmt.Fprintf(os.Stderr, format, args...) > } > > func readFile(path string, output Output) error { > file, err := os.Open(path) > if err != nil { > return err > } > defer file.Close() > buf := bufio.NewReader(file) > packets := openpgp.NewOpaqueReader(buf) > for { > op, err := packets.Next() > if err != nil { > if (err == io.EOF) { > break > } > return err > } > p, err := op.Parse() > if err != nil { > continue > } > if verbose { > output.Print("%s\n", reflect.TypeOf(p).String()) > } > if uid, ok := p.(*openpgp.UserId); ok { > output.Print("uid: %#v\n", uid.Id) > } > } > return nil > } > > // Buffered output to standard output, without synchronization. > type sequentialOutput struct { > buffer *bufio.Writer > } > > func newSequentialOutput() *sequentialOutput { > return &sequentialOutput{bufio.NewWriter(os.Stdout)} > } > > func (p *sequentialOutput) Print(format string, args ...interface{}) { > fmt.Fprintf(p.buffer, format, args...) > } > > func (p *sequentialOutput) Stop() { > p.buffer.Flush() > } > > func processSequential(files []string) { > output := newSequentialOutput() > for _, path := range files { > err := readFile(path, output) > if err != nil { > printError("%s: error: %s\n", path, err.Error()) > } > } > output.Stop() > } > > // Return a channel a closed which contains the given path strings. > func channelOfPaths(paths []string) chan string { > ch := make(chan string, len(paths)) > for _, path := range paths { > ch <- path > } > close(ch) > return ch > } > > // Spawned as a goroutine to process files from a channel. > func processFromChannel(filesChannel chan string, ackChannel chan > struct{}, > output Output) { > for path := range filesChannel { > err := readFile(path, output) > if err != nil { > printError("%s: error: %s\n", path, err.Error()) > } > ackChannel <- struct{}{} > } > } > > // Parallel output to standard output, with synchronization to prevent > // interleaving. > type parallelOutput struct { > dataChannel chan string // Strings to print, "" means > termiantion. > ackChannel chan struct{} // Signals completed termination request. > } > > func newParallelOutput() *parallelOutput { > p := ¶llelOutput{ > dataChannel: make(chan string, 5000), > ackChannel: make(chan struct{})} > go parallelOutputGoroutine(p) > return p > } > > > func parallelOutputGoroutine(p *parallelOutput) { > var buf = bufio.NewWriter(os.Stdout) > for s := range p.dataChannel { > if s == "" { > break > } > buf.WriteString(s) > } > buf.Flush() > // Tell Stop() that processing is complete. > close(p.ackChannel) > } > > func (p *parallelOutput) Print(format string, args ...interface{}) { > s := fmt.Sprintf(format, args...) > if s != "" { > p.dataChannel <- s > } > } > > func (p *parallelOutput) Stop() { > // Request termination. > p.dataChannel <- "" > // Wait for completion of all pending requests. > _, _ = <- p.ackChannel > } > > // Number of parallel threads, controlled by the -threads flag. > var threads int > > // Process the files in parallel. > func processParallel(files []string) { > if len(files) == 0 { > return > } > > filesChannel := channelOfPaths(files) > > // This channel is used to detect termination. > ackChannel := make(chan struct{}) > > fmt.Fprintf(os.Stderr, "info: threads: %d\n", threads) > > output := newParallelOutput() > for i := 0; i < threads; i++ { > go processFromChannel(filesChannel, ackChannel, output) > } > > // Wait for termination of the conversion. > acks := 0 > for _ = range ackChannel { > acks++ > if acks == len(files) { > break > } > } > > // Wait until all pending entries have been printed. > output.Stop() > } > > func main() { > var sequential bool > flag.BoolVar(&verbose, "verbose", false, "more verbose output") > flag.BoolVar(&sequential, "sequential", false, "disable > concurrency") > flag.IntVar(&threads, "threads", runtime.GOMAXPROCS(0), > "number of processing threads") > flag.Parse() > > files := flag.Args() > if (sequential) { > processSequential(files) > } else { > processParallel(files) > } > } > -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. For more options, visit https://groups.google.com/d/optout.