Re: [go-nuts] Re: go scheduler tracing

2020-06-17 Thread Ian Lance Taylor
On Wed, Jun 17, 2020 at 4:40 PM envee  wrote:
>
> Hi Robert, It is in my first post in this thread. Basically, I want to know 
> why all my logical processors are not being used  in my program. Thanks.

New goroutines are added to the run queue for the P that creates them.
When a P has nothing to do, it will steal goroutines from the run
queue of other P's.  The run queue length doesn't necessarily indicate
about whether P's are running them; it just tells you something about
which P's are creating new goroutines.

Ian


> On Thursday, 18 June 2020 07:24:40 UTC+10, Robert Engels wrote:
>>
>> What is the question?
>>
>> On Jun 17, 2020, at 4:06 PM, envee  wrote:
>>
>> Hi, Is anyone able to help me here ?
>> Here is a (simplified) snippet of the code, in case it helps answering my 
>> query. I basically create a goroutine for every input file (assume max 8) 
>> and then wait for processing of all files to finish. Each goroutine 
>> processes a line within the file and then any records which match a certain 
>> criteria are appended to a slice. After all lines have been processed in a 
>> file, the list is Sent to a channel. Finally, in the Closer goroutine, I 
>> wait for all goroutines to finish and close the channel once all goroutines 
>> have finished :
>>
>> package main
>>
>> import (
>> "bufio"
>> "compress/gzip"
>> "flag"
>> "fmt"
>> "log"
>> "os"
>> "path/filepath"
>> "strings"
>> "sync"
>> "github.com/en-vee/alog"
>> )
>>
>> const (
>> inputFilePrefix = "subscriber_db_"
>> )
>>
>> var (
>> inputDir  string
>> )
>>
>> type QuarantineObject struct {
>> objectType string
>> id string
>> }
>>
>> func init() {
>> flag.StringVar(, "d", "", "Path to the Input folder which is to be 
>> analysed")
>> }
>>
>> func main() {
>>
>> var err error
>> alog.SetLogLevel(alog.TRACE)
>> flag.Parse()
>>
>> // Validation of input parameters
>> if inputDir == "" {
>> fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
>> flag.Usage()
>> os.Exit(1)
>> }
>>
>> // Is the input directory valid ?
>> if _, err := os.Stat(inputDir); os.IsNotExist(err) {
>> fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
>> flag.Usage()
>> os.Exit(1)
>> }
>>
>> // Determine all subscriber files by matching on the subscriber files prefix
>>
>> inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, 
>> inputFilePrefix))
>> if err != nil {
>> fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
>> os.Exit(1)
>> }
>>
>> // Loop through all subscriber files
>> // Make a goroutine for processing each file
>> // Create a channel to receive the quarantined objects
>> qObjChannel := make(chan []QuarantineObject, len(inputFileNames))
>>
>> //runtime.GOMAXPROCS(len(inputFileNames))
>> var wg sync.WaitGroup
>> for _, inputFileGz := range inputFileNames {
>> wg.Add(1)
>> go func(inputFileGz string) {
>> nRecords := 0
>>
>> qObjList := make([]QuarantineObject, 0, 0)
>> defer wg.Done()
>> defer func() {
>> alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", 
>> inputFileGz, nRecords)
>> }()
>> // Open the file as a GZIP stream
>> alog.Trace("==")
>> alog.Trace("Processing Input File : %s", inputFileGz)
>> alog.Trace("==")
>>
>> f, err := os.Open(inputFileGz)
>> if err != nil {
>> fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
>> return
>> }
>> defer f.Close()
>>
>> fgz, err := gzip.NewReader(f)
>> if err != nil {
>> fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
>> return
>> }
>> defer fgz.Close()
>>
>> scanner := bufio.NewScanner(fgz)
>>
>> // Iterate over all lines of the file and decode
>>
>> for scanner.Scan() {
>> qObject := decodeLine()
>> if qObject.IsQuarantined() {
>> qObjList = append(qObjList, qObject)
>> }
>> }
>> ///
>> // After all lines have been processed, Send to Channel
>> ///
>> qObjChannel <- qObjList
>> }(inputFileGz)
>>
>> }
>>
>> fmt.Println("Waiting for processing of all files to finish")
>> ///
>> // Closer GoRoutine
>> ///
>> go func() {
>> wg.Wait()
>> close(qObjChannel)
>> fmt.Println("Quarantined Objects List")
>> fmt.Println("")
>> }()
>>
>> qFound := false
>>
>> for qObjList := range qObjChannel {
>> for _, qObj := range qObjList {
>> fmt.Println(qObj.id, "--->", qObj.objectType)
>> qFound = true
>> }
>> }
>>
>> }
>>
>>
>>
>> On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote:
>>>
>>> I am running a program which reads multiple gzipped input files and 
>>> performs some processing on each line of the file.
>>> It creates 8 

Re: [go-nuts] Re: go scheduler tracing

2020-06-17 Thread envee
Hi Robert, It is in my first post in this thread. Basically, I want to know 
why all my logical processors are not being used  in my program. Thanks.

On Thursday, 18 June 2020 07:24:40 UTC+10, Robert Engels wrote:
>
> What is the question?
>
> On Jun 17, 2020, at 4:06 PM, envee > 
> wrote:
>
> Hi, Is anyone able to help me here ?
> Here is a (simplified) snippet of the code, in case it helps answering my 
> query. I basically create a goroutine for every input file (assume max 8) 
> and then wait for processing of all files to finish. Each goroutine 
> processes a line within the file and then any records which match a certain 
> criteria are appended to a slice. After all lines have been processed in a 
> file, the list is Sent to a channel. Finally, in the Closer goroutine, I 
> wait for all goroutines to finish and close the channel once all goroutines 
> have finished :
>
> package main
>
> import (
> "bufio"
> "compress/gzip"
> "flag"
> "fmt"
> "log"
> "os"
> "path/filepath"
> "strings"
> "sync"
> "github.com/en-vee/alog"
> )
>
> const (
> inputFilePrefix = "subscriber_db_"
> )
>
> var (
> inputDir  string
> )
>
> type QuarantineObject struct {
> objectType string
> id string
> }
>
> func init() {
> flag.StringVar(, "d", "", "Path to the Input folder which is to 
> be analysed")
> }
>
> func main() {
>
> var err error
> alog.SetLogLevel(alog.TRACE)
> flag.Parse()
>
> // Validation of input parameters
> if inputDir == "" {
> fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
> flag.Usage()
> os.Exit(1)
> }
>
> // Is the input directory valid ?
> if _, err := os.Stat(inputDir); os.IsNotExist(err) {
> fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
> flag.Usage()
> os.Exit(1)
> }
>
> // Determine all subscriber files by matching on the subscriber files 
> prefix
>
> inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", 
> inputDir, inputFilePrefix))
> if err != nil {
> fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
> os.Exit(1)
> }
>
> // Loop through all subscriber files
> // Make a goroutine for processing each file
> // Create a channel to receive the quarantined objects
> qObjChannel := make(chan []QuarantineObject, len(inputFileNames))
>
> //runtime.GOMAXPROCS(len(inputFileNames))
> var wg sync.WaitGroup
> for _, inputFileGz := range inputFileNames {
> wg.Add(1)
> go func(inputFileGz string) {
> nRecords := 0
>
> qObjList := make([]QuarantineObject, 0, 0)
> defer wg.Done()
> defer func() {
> alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", 
> inputFileGz, nRecords)
> }()
> // Open the file as a GZIP stream
>
> alog.Trace("==")
> alog.Trace("Processing Input File : %s", inputFileGz)
>
> alog.Trace("==")
>
> f, err := os.Open(inputFileGz)
> if err != nil {
> fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
> return
> }
> defer f.Close()
>
> fgz, err := gzip.NewReader(f)
> if err != nil {
> fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
> return
> }
> defer fgz.Close()
>
> scanner := bufio.NewScanner(fgz)
>
> // Iterate over all lines of the file and decode
>
> for scanner.Scan() {
> qObject := decodeLine()
> if qObject.IsQuarantined() {
> qObjList = append(qObjList, qObject)
> }
> }
> ///
> // After all lines have been processed, Send to Channel
> ///
> qObjChannel <- qObjList
> }(inputFileGz)
>
> }
>
> fmt.Println("Waiting for processing of all files to finish")
> ///
> // Closer GoRoutine
> ///
> go func() {
> wg.Wait()
> close(qObjChannel)
> fmt.Println("Quarantined Objects List")
> fmt.Println("")
> }()
>
> qFound := false
>
> for qObjList := range qObjChannel {
> for _, qObj := range qObjList {
> fmt.Println(qObj.id, "--->", qObj.objectType)
> qFound = true
> }
> }
>
> }
>
>
>
> On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote:
>>
>> I am running a program which reads multiple gzipped input files and 
>> performs some processing on each line of the file. 
>> It creates 8 goroutines (1 per input file which is to be processed. the 
>> number of such files can be thought to remain 8 at the max).
>> Each of the go routines send to a buffered channel after finishing 
>> processing of their respective file.
>> After creating the go routines, the program waits (using WaitGroup) for 
>> all go routines to finish and also drain the channel for all the values 
>> sent by the go routines.
>>
>> I have an 4 core CPU with 2 threads per core = 8 logical cores.
>>
>> But I set GOMAXPROCS=4
>>
>> When I run 

Re: [go-nuts] Re: go scheduler tracing

2020-06-17 Thread Robert Engels
What is the question?

> On Jun 17, 2020, at 4:06 PM, envee  wrote:
> 
> Hi, Is anyone able to help me here ?
> Here is a (simplified) snippet of the code, in case it helps answering my 
> query. I basically create a goroutine for every input file (assume max 8) and 
> then wait for processing of all files to finish. Each goroutine processes a 
> line within the file and then any records which match a certain criteria are 
> appended to a slice. After all lines have been processed in a file, the list 
> is Sent to a channel. Finally, in the Closer goroutine, I wait for all 
> goroutines to finish and close the channel once all goroutines have finished :
> 
> package main
> 
> import (
>   "bufio"
>   "compress/gzip"
>   "flag"
>   "fmt"
>   "log"
>   "os"
>   "path/filepath"
>   "strings"
>   "sync"
>   "github.com/en-vee/alog"
> )
> 
> const (
>   inputFilePrefix = "subscriber_db_"
> )
> 
> var (
>   inputDir  string
> )
> 
> type QuarantineObject struct {
>   objectType string
>   id string
> }
> 
> func init() {
>   flag.StringVar(, "d", "", "Path to the Input folder which is 
> to be analysed")
> }
> 
> func main() {
> 
>   var err error
>   alog.SetLogLevel(alog.TRACE)
>   flag.Parse()
> 
>   // Validation of input parameters
>   if inputDir == "" {
>   fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
>   flag.Usage()
>   os.Exit(1)
>   }
> 
>   // Is the input directory valid ?
>   if _, err := os.Stat(inputDir); os.IsNotExist(err) {
>   fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", 
> inputDir)
>   flag.Usage()
>   os.Exit(1)
>   }
> 
>   // Determine all subscriber files by matching on the subscriber files 
> prefix
> 
>   inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", 
> inputDir, inputFilePrefix))
>   if err != nil {
>   fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
>   os.Exit(1)
>   }
> 
>   // Loop through all subscriber files
>   // Make a goroutine for processing each file
>   // Create a channel to receive the quarantined objects
>   qObjChannel := make(chan []QuarantineObject, len(inputFileNames))
> 
>   //runtime.GOMAXPROCS(len(inputFileNames))
>   var wg sync.WaitGroup
>   for _, inputFileGz := range inputFileNames {
>   wg.Add(1)
>   go func(inputFileGz string) {
>   nRecords := 0
> 
>   qObjList := make([]QuarantineObject, 0, 0)
>   defer wg.Done()
>   defer func() {
>   alog.Trace("Finished Processing File : %s. 
> Total Records Analysed : %d\n", inputFileGz, nRecords)
>   }()
>   // Open the file as a GZIP stream
>   
> alog.Trace("==")
>   alog.Trace("Processing Input File : %s", inputFileGz)
>   
> alog.Trace("==")
> 
>   f, err := os.Open(inputFileGz)
>   if err != nil {
>   fmt.Fprintf(os.Stderr, "Error opening file : 
> %v\n", err)
>   return
>   }
>   defer f.Close()
> 
>   fgz, err := gzip.NewReader(f)
>   if err != nil {
>   fmt.Fprintf(os.Stderr, "Error creating GZIP 
> reader : %v\n", err)
>   return
>   }
>   defer fgz.Close()
> 
>   scanner := bufio.NewScanner(fgz)
> 
>   // Iterate over all lines of the file and decode
> 
>   for scanner.Scan() {
>   qObject := decodeLine()
>   if qObject.IsQuarantined() {
>   qObjList = append(qObjList, qObject)
>   }
>   }
>   ///
>   // After all lines have been processed, Send to Channel
>   ///
>   qObjChannel <- qObjList
>   }(inputFileGz)
> 
>   }
> 
>   
>   fmt.Println("Waiting for processing of all files to finish")
>   ///
>   // Closer GoRoutine
>   ///
>   go 

[go-nuts] Re: go scheduler tracing

2020-06-17 Thread envee
Hi, Is anyone able to help me here ?
Here is a (simplified) snippet of the code, in case it helps answering my 
query. I basically create a goroutine for every input file (assume max 8) 
and then wait for processing of all files to finish. Each goroutine 
processes a line within the file and then any records which match a certain 
criteria are appended to a slice. After all lines have been processed in a 
file, the list is Sent to a channel. Finally, in the Closer goroutine, I 
wait for all goroutines to finish and close the channel once all goroutines 
have finished :

package main

import (
"bufio"
"compress/gzip"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"github.com/en-vee/alog"
)

const (
inputFilePrefix = "subscriber_db_"
)

var (
inputDir  string
)

type QuarantineObject struct {
objectType string
id string
}

func init() {
flag.StringVar(, "d", "", "Path to the Input folder which is to be 
analysed")
}

func main() {

var err error
alog.SetLogLevel(alog.TRACE)
flag.Parse()

// Validation of input parameters
if inputDir == "" {
fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
flag.Usage()
os.Exit(1)
}

// Is the input directory valid ?
if _, err := os.Stat(inputDir); os.IsNotExist(err) {
fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
flag.Usage()
os.Exit(1)
}

// Determine all subscriber files by matching on the subscriber files prefix

inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, 
inputFilePrefix))
if err != nil {
fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
os.Exit(1)
}

// Loop through all subscriber files
// Make a goroutine for processing each file
// Create a channel to receive the quarantined objects
qObjChannel := make(chan []QuarantineObject, len(inputFileNames))

//runtime.GOMAXPROCS(len(inputFileNames))
var wg sync.WaitGroup
for _, inputFileGz := range inputFileNames {
wg.Add(1)
go func(inputFileGz string) {
nRecords := 0

qObjList := make([]QuarantineObject, 0, 0)
defer wg.Done()
defer func() {
alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", 
inputFileGz, nRecords)
}()
// Open the file as a GZIP stream
alog.Trace("==")
alog.Trace("Processing Input File : %s", inputFileGz)
alog.Trace("==")

f, err := os.Open(inputFileGz)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
return
}
defer f.Close()

fgz, err := gzip.NewReader(f)
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
return
}
defer fgz.Close()

scanner := bufio.NewScanner(fgz)

// Iterate over all lines of the file and decode

for scanner.Scan() {
qObject := decodeLine()
if qObject.IsQuarantined() {
qObjList = append(qObjList, qObject)
}
}
///
// After all lines have been processed, Send to Channel
///
qObjChannel <- qObjList
}(inputFileGz)

}

fmt.Println("Waiting for processing of all files to finish")
///
// Closer GoRoutine
///
go func() {
wg.Wait()
close(qObjChannel)
fmt.Println("Quarantined Objects List")
fmt.Println("")
}()

qFound := false

for qObjList := range qObjChannel {
for _, qObj := range qObjList {
fmt.Println(qObj.id, "--->", qObj.objectType)
qFound = true
}
}

}



On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote:
>
> I am running a program which reads multiple gzipped input files and 
> performs some processing on each line of the file. 
> It creates 8 goroutines (1 per input file which is to be processed. the 
> number of such files can be thought to remain 8 at the max).
> Each of the go routines send to a buffered channel after finishing 
> processing of their respective file.
> After creating the go routines, the program waits (using WaitGroup) for 
> all go routines to finish and also drain the channel for all the values 
> sent by the go routines.
>
> I have an 4 core CPU with 2 threads per core = 8 logical cores.
>
> But I set GOMAXPROCS=4
>
> When I run the program with scheduler trace interval set to 1000ms, I can 
> see the following :
>
> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
> idlethreads=0 runqueue=0 [0 0 0 1]
> SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
> idlethreads=1 runqueue=0 [1 0 5 0]
> SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
> idlethreads=1 runqueue=1 [0 0 1 0]
> SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
> idlethreads=2 runqueue=0 [0 0 0 0]
> SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
>