Yes, the code doesn’t work :) - it will only ever produce 2 items - unless that was expected - even so, you want the N workers doing work, and probably a constant number sending to Kafka - but a lot depends on your “serial needs”. In your case you only have 2 workers producing work, and N senders - which is backwards to me.
I would also say that your variable names could be improved - as “autoCancelChan” isn’t really meaningful here, it is simple a chan used to send items to the Kafka senders (at least I think). > On Dec 26, 2019, at 10:12 PM, Amarjeet Anand <amarjeetanandsi...@gmail.com> > wrote: > > Hi > > I have to produce some task to kafka parallely. So I want to implement a > simple worker group pattern in go. > > Does the below code decent enough to take it to production? > > > var workerCount = runtime.NumCPU()*7 + 1 > > func WorkerPattern() { > taskWg := &sync.WaitGroup{} > taskWg.Add(2) > > autoCancelChan := make(chan string, workerCount*3) // *3, just to make > enough room. workers will be slower anyways > go produceTaskOfType1ToChan(taskWg, autoCancelChan) > go produceTaskOfType2ToChan(taskWg, autoCancelChan) > > // start workers to push autoCancel to kafka > workerWg := &sync.WaitGroup{} > go kafkaProducerWorkers(autoCancelChan, workerWg) > > // wait to close autoCancelChan channel till all the task is written > taskWg.Wait() > close(autoCancelChan) > > // wait till all workers finish their task > workerWg.Wait() > > fmt.Println("Done!!!") > } > > func produceTaskOfType1ToChan(wg *sync.WaitGroup, autoCancelChan chan string) > { > defer wg.Done() > // can produce random number of task on autoCancelChan > autoCancelChan <- "task of type of 1" > } > > func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan string) > { > defer wg.Done() > // can produce random number of task on autoCancelChan > autoCancelChan <- "task of type of 2" > } > > func kafkaProducerWorkers(autoCancelChan chan string, workerWg > *sync.WaitGroup) { > workerWg.Add(workerCount) > for i := 0; i < workerCount; i++ { > go produceToKafka(autoCancelChan, workerWg) > } > } > > func produceToKafka(autoCancelChan chan string, workerWg *sync.WaitGroup) { > defer workerWg.Done() > > // for loop will terminate once autoCancelChan is closed > for autoCancel := range autoCancelChan { > KafkaClient.PublishToKafkaTopic(autoCancel) > } > } > Any improvement you can suggest to this code? > > > > > > > -- > You received this message because you are subscribed to the Google Groups > "golang-nuts" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to golang-nuts+unsubscr...@googlegroups.com > <mailto:golang-nuts+unsubscr...@googlegroups.com>. > To view this discussion on the web visit > https://groups.google.com/d/msgid/golang-nuts/CANFuhy8qBjooo_tB_gT0f%3DTE4DaOFqWL5SWwNghy%2BL-eV82KdA%40mail.gmail.com > > <https://groups.google.com/d/msgid/golang-nuts/CANFuhy8qBjooo_tB_gT0f%3DTE4DaOFqWL5SWwNghy%2BL-eV82KdA%40mail.gmail.com?utm_medium=email&utm_source=footer>. -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/4BD6D8C1-FC0F-4B65-B696-146E3B9E4EFA%40ix.netcom.com.