Hi, Is anyone able to help me here ? Here is a (simplified) snippet of the code, in case it helps answering my query. I basically create a goroutine for every input file (assume max 8) and then wait for processing of all files to finish. Each goroutine processes a line within the file and then any records which match a certain criteria are appended to a slice. After all lines have been processed in a file, the list is Sent to a channel. Finally, in the Closer goroutine, I wait for all goroutines to finish and close the channel once all goroutines have finished :
package main import ( "bufio" "compress/gzip" "flag" "fmt" "log" "os" "path/filepath" "strings" "sync" "github.com/en-vee/alog" ) const ( inputFilePrefix = "subscriber_db_" ) var ( inputDir string ) type QuarantineObject struct { objectType string id string } func init() { flag.StringVar(&inputDir, "d", "", "Path to the Input folder which is to be analysed") } func main() { var err error alog.SetLogLevel(alog.TRACE) flag.Parse() // Validation of input parameters if inputDir == "" { fmt.Fprintf(os.Stderr, "No Input Directory Specified\n") flag.Usage() os.Exit(1) } // Is the input directory valid ? if _, err := os.Stat(inputDir); os.IsNotExist(err) { fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir) flag.Usage() os.Exit(1) } // Determine all subscriber files by matching on the subscriber files prefix inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, inputFilePrefix)) if err != nil { fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err) os.Exit(1) } // Loop through all subscriber files // Make a goroutine for processing each file // Create a channel to receive the quarantined objects qObjChannel := make(chan []QuarantineObject, len(inputFileNames)) //runtime.GOMAXPROCS(len(inputFileNames)) var wg sync.WaitGroup for _, inputFileGz := range inputFileNames { wg.Add(1) go func(inputFileGz string) { nRecords := 0 qObjList := make([]QuarantineObject, 0, 0) defer wg.Done() defer func() { alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", inputFileGz, nRecords) }() // Open the file as a GZIP stream alog.Trace("==================================================================================================================================") alog.Trace("Processing Input File : %s", inputFileGz) alog.Trace("==================================================================================================================================") f, err := os.Open(inputFileGz) if err != nil { fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err) return } defer f.Close() fgz, err := gzip.NewReader(f) if err != nil { fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err) return } defer fgz.Close() scanner := bufio.NewScanner(fgz) // Iterate over all lines of the file and decode for scanner.Scan() { qObject := decodeLine() if qObject.IsQuarantined() { qObjList = append(qObjList, qObject) } } /////////////////////////////////////////////////////// // After all lines have been processed, Send to Channel /////////////////////////////////////////////////////// qObjChannel <- qObjList }(inputFileGz) } fmt.Println("Waiting for processing of all files to finish") /////////////////////////////////////////////////////// // Closer GoRoutine /////////////////////////////////////////////////////// go func() { wg.Wait() close(qObjChannel) fmt.Println("Quarantined Objects List") fmt.Println("------------------------") }() qFound := false for qObjList := range qObjChannel { for _, qObj := range qObjList { fmt.Println(qObj.id, "--->", qObj.objectType) qFound = true } } } On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote: > > I am running a program which reads multiple gzipped input files and > performs some processing on each line of the file. > It creates 8 goroutines (1 per input file which is to be processed. the > number of such files can be thought to remain 8 at the max). > Each of the go routines send to a buffered channel after finishing > processing of their respective file. > After creating the go routines, the program waits (using WaitGroup) for > all go routines to finish and also drain the channel for all the values > sent by the go routines. > > I have an 4 core CPU with 2 threads per core = 8 logical cores. > > But I set GOMAXPROCS=4 > > When I run the program with scheduler trace interval set to 1000ms, I can > see the following : > > SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 > idlethreads=0 runqueue=0 [0 0 0 1] > SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 > idlethreads=1 runqueue=0 [1 0 5 0] > SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 > idlethreads=1 runqueue=1 [0 0 1 0] > SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 > idlethreads=2 runqueue=0 [0 0 0 0] > SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 > idlethreads=2 runqueue=1 [0 0 0 4] > > > If I create 8 go routines, shouldn't they all be distributed equally among > the 4 logical cores ? > > Why do some runqueues of the logical cores show values of 4 or 5 and some > have values of 0 ? > > I was hoping to see something like which I according to my understanding > means that all 4 processors have 1 go routine each waiting in the local > runqueue and at the same time has 1 go routine running on the assigned OS > Thread : > > SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 > idlethreads=0 runqueue=0 [1 1 1 1] > > Thanks. > -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/36efa087-d66c-4d7e-b5b2-de1d4d3ea339o%40googlegroups.com.