Re: [go-nuts] Re: go scheduler tracing
On Wed, Jun 17, 2020 at 4:40 PM envee wrote: > > Hi Robert, It is in my first post in this thread. Basically, I want to know > why all my logical processors are not being used in my program. Thanks. New goroutines are added to the run queue for the P that creates them. When a P has nothing to do, it will steal goroutines from the run queue of other P's. The run queue length doesn't necessarily indicate about whether P's are running them; it just tells you something about which P's are creating new goroutines. Ian > On Thursday, 18 June 2020 07:24:40 UTC+10, Robert Engels wrote: >> >> What is the question? >> >> On Jun 17, 2020, at 4:06 PM, envee wrote: >> >> Hi, Is anyone able to help me here ? >> Here is a (simplified) snippet of the code, in case it helps answering my >> query. I basically create a goroutine for every input file (assume max 8) >> and then wait for processing of all files to finish. Each goroutine >> processes a line within the file and then any records which match a certain >> criteria are appended to a slice. After all lines have been processed in a >> file, the list is Sent to a channel. Finally, in the Closer goroutine, I >> wait for all goroutines to finish and close the channel once all goroutines >> have finished : >> >> package main >> >> import ( >> "bufio" >> "compress/gzip" >> "flag" >> "fmt" >> "log" >> "os" >> "path/filepath" >> "strings" >> "sync" >> "github.com/en-vee/alog" >> ) >> >> const ( >> inputFilePrefix = "subscriber_db_" >> ) >> >> var ( >> inputDir string >> ) >> >> type QuarantineObject struct { >> objectType string >> id string >> } >> >> func init() { >> flag.StringVar(, "d", "", "Path to the Input folder which is to be >> analysed") >> } >> >> func main() { >> >> var err error >> alog.SetLogLevel(alog.TRACE) >> flag.Parse() >> >> // Validation of input parameters >> if inputDir == "" { >> fmt.Fprintf(os.Stderr, "No Input Directory Specified\n") >> flag.Usage() >> os.Exit(1) >> } >> >> // Is the input directory valid ? >> if _, err := os.Stat(inputDir); os.IsNotExist(err) { >> fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir) >> flag.Usage() >> os.Exit(1) >> } >> >> // Determine all subscriber files by matching on the subscriber files prefix >> >> inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, >> inputFilePrefix)) >> if err != nil { >> fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err) >> os.Exit(1) >> } >> >> // Loop through all subscriber files >> // Make a goroutine for processing each file >> // Create a channel to receive the quarantined objects >> qObjChannel := make(chan []QuarantineObject, len(inputFileNames)) >> >> //runtime.GOMAXPROCS(len(inputFileNames)) >> var wg sync.WaitGroup >> for _, inputFileGz := range inputFileNames { >> wg.Add(1) >> go func(inputFileGz string) { >> nRecords := 0 >> >> qObjList := make([]QuarantineObject, 0, 0) >> defer wg.Done() >> defer func() { >> alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", >> inputFileGz, nRecords) >> }() >> // Open the file as a GZIP stream >> alog.Trace("==") >> alog.Trace("Processing Input File : %s", inputFileGz) >> alog.Trace("==") >> >> f, err := os.Open(inputFileGz) >> if err != nil { >> fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err) >> return >> } >> defer f.Close() >> >> fgz, err := gzip.NewReader(f) >> if err != nil { >> fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err) >> return >> } >> defer fgz.Close() >> >> scanner := bufio.NewScanner(fgz) >> >> // Iterate over all lines of the file and decode >> >> for scanner.Scan() { >> qObject := decodeLine() >> if qObject.IsQuarantined() { >> qObjList = append(qObjList, qObject) >> } >> } >> /// >> // After all lines have been processed, Send to Channel >> /// >> qObjChannel <- qObjList >> }(inputFileGz) >> >> } >> >> fmt.Println("Waiting for processing of all files to finish") >> /// >> // Closer GoRoutine >> /// >> go func() { >> wg.Wait() >> close(qObjChannel) >> fmt.Println("Quarantined Objects List") >> fmt.Println("") >> }() >> >> qFound := false >> >> for qObjList := range qObjChannel { >> for _, qObj := range qObjList { >> fmt.Println(qObj.id, "--->", qObj.objectType) >> qFound = true >> } >> } >> >> } >> >> >> >> On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote: >>> >>> I am running a program which reads multiple gzipped input files and >>> performs some processing on each line of the file. >>> It creates 8
Re: [go-nuts] Re: go scheduler tracing
Hi Robert, It is in my first post in this thread. Basically, I want to know why all my logical processors are not being used in my program. Thanks. On Thursday, 18 June 2020 07:24:40 UTC+10, Robert Engels wrote: > > What is the question? > > On Jun 17, 2020, at 4:06 PM, envee > > wrote: > > Hi, Is anyone able to help me here ? > Here is a (simplified) snippet of the code, in case it helps answering my > query. I basically create a goroutine for every input file (assume max 8) > and then wait for processing of all files to finish. Each goroutine > processes a line within the file and then any records which match a certain > criteria are appended to a slice. After all lines have been processed in a > file, the list is Sent to a channel. Finally, in the Closer goroutine, I > wait for all goroutines to finish and close the channel once all goroutines > have finished : > > package main > > import ( > "bufio" > "compress/gzip" > "flag" > "fmt" > "log" > "os" > "path/filepath" > "strings" > "sync" > "github.com/en-vee/alog" > ) > > const ( > inputFilePrefix = "subscriber_db_" > ) > > var ( > inputDir string > ) > > type QuarantineObject struct { > objectType string > id string > } > > func init() { > flag.StringVar(, "d", "", "Path to the Input folder which is to > be analysed") > } > > func main() { > > var err error > alog.SetLogLevel(alog.TRACE) > flag.Parse() > > // Validation of input parameters > if inputDir == "" { > fmt.Fprintf(os.Stderr, "No Input Directory Specified\n") > flag.Usage() > os.Exit(1) > } > > // Is the input directory valid ? > if _, err := os.Stat(inputDir); os.IsNotExist(err) { > fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir) > flag.Usage() > os.Exit(1) > } > > // Determine all subscriber files by matching on the subscriber files > prefix > > inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", > inputDir, inputFilePrefix)) > if err != nil { > fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err) > os.Exit(1) > } > > // Loop through all subscriber files > // Make a goroutine for processing each file > // Create a channel to receive the quarantined objects > qObjChannel := make(chan []QuarantineObject, len(inputFileNames)) > > //runtime.GOMAXPROCS(len(inputFileNames)) > var wg sync.WaitGroup > for _, inputFileGz := range inputFileNames { > wg.Add(1) > go func(inputFileGz string) { > nRecords := 0 > > qObjList := make([]QuarantineObject, 0, 0) > defer wg.Done() > defer func() { > alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", > inputFileGz, nRecords) > }() > // Open the file as a GZIP stream > > alog.Trace("==") > alog.Trace("Processing Input File : %s", inputFileGz) > > alog.Trace("==") > > f, err := os.Open(inputFileGz) > if err != nil { > fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err) > return > } > defer f.Close() > > fgz, err := gzip.NewReader(f) > if err != nil { > fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err) > return > } > defer fgz.Close() > > scanner := bufio.NewScanner(fgz) > > // Iterate over all lines of the file and decode > > for scanner.Scan() { > qObject := decodeLine() > if qObject.IsQuarantined() { > qObjList = append(qObjList, qObject) > } > } > /// > // After all lines have been processed, Send to Channel > /// > qObjChannel <- qObjList > }(inputFileGz) > > } > > fmt.Println("Waiting for processing of all files to finish") > /// > // Closer GoRoutine > /// > go func() { > wg.Wait() > close(qObjChannel) > fmt.Println("Quarantined Objects List") > fmt.Println("") > }() > > qFound := false > > for qObjList := range qObjChannel { > for _, qObj := range qObjList { > fmt.Println(qObj.id, "--->", qObj.objectType) > qFound = true > } > } > > } > > > > On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote: >> >> I am running a program which reads multiple gzipped input files and >> performs some processing on each line of the file. >> It creates 8 goroutines (1 per input file which is to be processed. the >> number of such files can be thought to remain 8 at the max). >> Each of the go routines send to a buffered channel after finishing >> processing of their respective file. >> After creating the go routines, the program waits (using WaitGroup) for >> all go routines to finish and also drain the channel for all the values >> sent by the go routines. >> >> I have an 4 core CPU with 2 threads per core = 8 logical cores. >> >> But I set GOMAXPROCS=4 >> >> When I run
Re: [go-nuts] Re: go scheduler tracing
What is the question? > On Jun 17, 2020, at 4:06 PM, envee wrote: > > Hi, Is anyone able to help me here ? > Here is a (simplified) snippet of the code, in case it helps answering my > query. I basically create a goroutine for every input file (assume max 8) and > then wait for processing of all files to finish. Each goroutine processes a > line within the file and then any records which match a certain criteria are > appended to a slice. After all lines have been processed in a file, the list > is Sent to a channel. Finally, in the Closer goroutine, I wait for all > goroutines to finish and close the channel once all goroutines have finished : > > package main > > import ( > "bufio" > "compress/gzip" > "flag" > "fmt" > "log" > "os" > "path/filepath" > "strings" > "sync" > "github.com/en-vee/alog" > ) > > const ( > inputFilePrefix = "subscriber_db_" > ) > > var ( > inputDir string > ) > > type QuarantineObject struct { > objectType string > id string > } > > func init() { > flag.StringVar(, "d", "", "Path to the Input folder which is > to be analysed") > } > > func main() { > > var err error > alog.SetLogLevel(alog.TRACE) > flag.Parse() > > // Validation of input parameters > if inputDir == "" { > fmt.Fprintf(os.Stderr, "No Input Directory Specified\n") > flag.Usage() > os.Exit(1) > } > > // Is the input directory valid ? > if _, err := os.Stat(inputDir); os.IsNotExist(err) { > fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", > inputDir) > flag.Usage() > os.Exit(1) > } > > // Determine all subscriber files by matching on the subscriber files > prefix > > inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", > inputDir, inputFilePrefix)) > if err != nil { > fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err) > os.Exit(1) > } > > // Loop through all subscriber files > // Make a goroutine for processing each file > // Create a channel to receive the quarantined objects > qObjChannel := make(chan []QuarantineObject, len(inputFileNames)) > > //runtime.GOMAXPROCS(len(inputFileNames)) > var wg sync.WaitGroup > for _, inputFileGz := range inputFileNames { > wg.Add(1) > go func(inputFileGz string) { > nRecords := 0 > > qObjList := make([]QuarantineObject, 0, 0) > defer wg.Done() > defer func() { > alog.Trace("Finished Processing File : %s. > Total Records Analysed : %d\n", inputFileGz, nRecords) > }() > // Open the file as a GZIP stream > > alog.Trace("==") > alog.Trace("Processing Input File : %s", inputFileGz) > > alog.Trace("==") > > f, err := os.Open(inputFileGz) > if err != nil { > fmt.Fprintf(os.Stderr, "Error opening file : > %v\n", err) > return > } > defer f.Close() > > fgz, err := gzip.NewReader(f) > if err != nil { > fmt.Fprintf(os.Stderr, "Error creating GZIP > reader : %v\n", err) > return > } > defer fgz.Close() > > scanner := bufio.NewScanner(fgz) > > // Iterate over all lines of the file and decode > > for scanner.Scan() { > qObject := decodeLine() > if qObject.IsQuarantined() { > qObjList = append(qObjList, qObject) > } > } > /// > // After all lines have been processed, Send to Channel > /// > qObjChannel <- qObjList > }(inputFileGz) > > } > > > fmt.Println("Waiting for processing of all files to finish") > /// > // Closer GoRoutine > /// > go
[go-nuts] Re: go scheduler tracing
Hi, Is anyone able to help me here ? Here is a (simplified) snippet of the code, in case it helps answering my query. I basically create a goroutine for every input file (assume max 8) and then wait for processing of all files to finish. Each goroutine processes a line within the file and then any records which match a certain criteria are appended to a slice. After all lines have been processed in a file, the list is Sent to a channel. Finally, in the Closer goroutine, I wait for all goroutines to finish and close the channel once all goroutines have finished : package main import ( "bufio" "compress/gzip" "flag" "fmt" "log" "os" "path/filepath" "strings" "sync" "github.com/en-vee/alog" ) const ( inputFilePrefix = "subscriber_db_" ) var ( inputDir string ) type QuarantineObject struct { objectType string id string } func init() { flag.StringVar(, "d", "", "Path to the Input folder which is to be analysed") } func main() { var err error alog.SetLogLevel(alog.TRACE) flag.Parse() // Validation of input parameters if inputDir == "" { fmt.Fprintf(os.Stderr, "No Input Directory Specified\n") flag.Usage() os.Exit(1) } // Is the input directory valid ? if _, err := os.Stat(inputDir); os.IsNotExist(err) { fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir) flag.Usage() os.Exit(1) } // Determine all subscriber files by matching on the subscriber files prefix inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, inputFilePrefix)) if err != nil { fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err) os.Exit(1) } // Loop through all subscriber files // Make a goroutine for processing each file // Create a channel to receive the quarantined objects qObjChannel := make(chan []QuarantineObject, len(inputFileNames)) //runtime.GOMAXPROCS(len(inputFileNames)) var wg sync.WaitGroup for _, inputFileGz := range inputFileNames { wg.Add(1) go func(inputFileGz string) { nRecords := 0 qObjList := make([]QuarantineObject, 0, 0) defer wg.Done() defer func() { alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", inputFileGz, nRecords) }() // Open the file as a GZIP stream alog.Trace("==") alog.Trace("Processing Input File : %s", inputFileGz) alog.Trace("==") f, err := os.Open(inputFileGz) if err != nil { fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err) return } defer f.Close() fgz, err := gzip.NewReader(f) if err != nil { fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err) return } defer fgz.Close() scanner := bufio.NewScanner(fgz) // Iterate over all lines of the file and decode for scanner.Scan() { qObject := decodeLine() if qObject.IsQuarantined() { qObjList = append(qObjList, qObject) } } /// // After all lines have been processed, Send to Channel /// qObjChannel <- qObjList }(inputFileGz) } fmt.Println("Waiting for processing of all files to finish") /// // Closer GoRoutine /// go func() { wg.Wait() close(qObjChannel) fmt.Println("Quarantined Objects List") fmt.Println("") }() qFound := false for qObjList := range qObjChannel { for _, qObj := range qObjList { fmt.Println(qObj.id, "--->", qObj.objectType) qFound = true } } } On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote: > > I am running a program which reads multiple gzipped input files and > performs some processing on each line of the file. > It creates 8 goroutines (1 per input file which is to be processed. the > number of such files can be thought to remain 8 at the max). > Each of the go routines send to a buffered channel after finishing > processing of their respective file. > After creating the go routines, the program waits (using WaitGroup) for > all go routines to finish and also drain the channel for all the values > sent by the go routines. > > I have an 4 core CPU with 2 threads per core = 8 logical cores. > > But I set GOMAXPROCS=4 > > When I run the program with scheduler trace interval set to 1000ms, I can > see the following : > > SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 > idlethreads=0 runqueue=0 [0 0 0 1] > SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 > idlethreads=1 runqueue=0 [1 0 5 0] > SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 > idlethreads=1 runqueue=1 [0 0 1 0] > SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 > idlethreads=2 runqueue=0 [0 0 0 0] > SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 >