The high level design discussions are great. But I want to point out that 
the code, as written, has a race related bug. 

When this line is called, the workerWg has a 0 count. 

go kafkaProducerWorkers(autoCancelChan, workerWg)

It is entirely possible for the workerWg.Wait() line to be called be called 
before the kafkaProducerWorkers() goruotine has started, in which case the 
WorkerPattern funtion will return without any work haveing been done. See 
https://play.golang.org/p/p2_A3lBW3Dc for a simuation of this case. The 
Sleep() on line 52 simulates the slow starting goroutine. 


On Thursday, December 26, 2019 at 11:12:29 PM UTC-5, Amarjeet Anand wrote:
>
> Hi
>
> I have to produce some task to kafka parallely. So I want to implement a 
> simple worker group pattern in go.
>
> Does the below code decent enough to take it to production?
>
> var workerCount = runtime.NumCPU()*7 + 1
>
> func WorkerPattern() {
>     taskWg := &sync.WaitGroup{}
>     taskWg.Add(2)
>
>     autoCancelChan := make(chan string, workerCount*3) // *3, just to make 
> enough room. workers will be slower anyways
>     go produceTaskOfType1ToChan(taskWg, autoCancelChan)
>     go produceTaskOfType2ToChan(taskWg, autoCancelChan)
>
>     // start workers to push autoCancel to kafka
>     workerWg := &sync.WaitGroup{}
>     go kafkaProducerWorkers(autoCancelChan, workerWg)
>
>     // wait to close autoCancelChan channel till all the task is written
>     taskWg.Wait()
>     close(autoCancelChan)
>
>     // wait till all workers finish their task
>     workerWg.Wait()
>
>     fmt.Println("Done!!!")}
>
> func produceTaskOfType1ToChan(wg *sync.WaitGroup, autoCancelChan chan string) 
> {
>     defer wg.Done()
>     // can produce random number of task on autoCancelChan
>     autoCancelChan <- "task of type of 1"}
>
> func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan string) 
> {
>     defer wg.Done()
>     // can produce random number of task on autoCancelChan
>     autoCancelChan <- "task of type of 2"}
>
> func kafkaProducerWorkers(autoCancelChan chan string, workerWg 
> *sync.WaitGroup) {
>     workerWg.Add(workerCount)
>     for i := 0; i < workerCount; i++ {
>         go produceToKafka(autoCancelChan, workerWg)
>     }}
>
> func produceToKafka(autoCancelChan chan string, workerWg *sync.WaitGroup) {
>     defer workerWg.Done()
>
>     // for loop will terminate once autoCancelChan is closed
>     for autoCancel := range autoCancelChan {
>         KafkaClient.PublishToKafkaTopic(autoCancel)
>     }}
>
> Any improvement you can suggest to this code?
>
>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/d5b7210b-19cf-45c6-9719-132edbbd1290%40googlegroups.com.

Reply via email to