I think it is more complex - or simpler :) - than that. A lot depends on the 
Kafka client - for example the sarama client recommends one client per 
producer/consumer, other clients may multiplex on the same client so having 
more than one consumer (sender) may not be beneficial if the IO is fully async 
acknowledged.

In general, you want to parallelize (add go routines) to the the portions that 
can be parallized (either because they benefit from additional cpu, 
scatter/gather IO (network requests or disk), or have independent/multiple 
destination output stages) - but you have to pay special attention to any 
“ordering” of events that may be required on the consumer side, and the 
acknowledgements required on the Kafka side.

In your example, you are still creating 7xNCPU senders, and only 2 producers - 
which would mean that each send is completely independent and is a minimum 25x 
slower than the producing (given 8 cores x 7 / 2 producers). This could be the 
case, but seems unlikely.

> On Dec 26, 2019, at 11:51 PM, Amarjeet Anand <amarjeetanandsi...@gmail.com> 
> wrote:
> 
> Hi Robert
> 
> Actually the code above is simplified to make it easy to understand.
> 
> Thanks for the suggestion on variable namings... Will improve that.
> 
> The scenario is like the producer functions(produceTaskOfType1ToChan() and 
> produceTaskOfType2ToChan()) will produce a list of strings to the channel... 
> like...
> 
> func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan string) 
> {
>     defer wg.Done()
>     autoCancelIds := getAutoCancelIdsFromSource2()
>     for autoCancelId := range autoCancelIds {
>         autoCancelChan <- autoCancelId
>     }
> }
> 
> Now does this code makes some sense?
> 
> 
> 
> On Fri, 27 Dec, 2019, 10:10 AM robert engels, <reng...@ix.netcom.com 
> <mailto:reng...@ix.netcom.com>> wrote:
> Yes, the code doesn’t work :) - it will only ever produce 2 items - unless 
> that was expected - even so, you want the N workers doing work, and probably 
> a constant number sending to Kafka - but a lot depends on your “serial 
> needs”. In your case you only have 2 workers producing work, and N senders - 
> which is backwards to me.
> 
> I would also say that your variable names could be improved - as 
> “autoCancelChan” isn’t really meaningful here, it is simple a chan used to 
> send items to the Kafka senders (at least I think).
> 
>> On Dec 26, 2019, at 10:12 PM, Amarjeet Anand <amarjeetanandsi...@gmail.com 
>> <mailto:amarjeetanandsi...@gmail.com>> wrote:
>> 
>> Hi
>> 
>> I have to produce some task to kafka parallely. So I want to implement a 
>> simple worker group pattern in go.
>> 
>> Does the below code decent enough to take it to production?
>> 
>> 
>> var workerCount = runtime.NumCPU()*7 + 1
>> 
>> func WorkerPattern() {
>>     taskWg := &sync.WaitGroup{}
>>     taskWg.Add(2)
>> 
>>     autoCancelChan := make(chan string, workerCount*3) // *3, just to make 
>> enough room. workers will be slower anyways
>>     go produceTaskOfType1ToChan(taskWg, autoCancelChan)
>>     go produceTaskOfType2ToChan(taskWg, autoCancelChan)
>> 
>>     // start workers to push autoCancel to kafka
>>     workerWg := &sync.WaitGroup{}
>>     go kafkaProducerWorkers(autoCancelChan, workerWg)
>> 
>>     // wait to close autoCancelChan channel till all the task is written
>>     taskWg.Wait()
>>     close(autoCancelChan)
>> 
>>     // wait till all workers finish their task
>>     workerWg.Wait()
>> 
>>     fmt.Println("Done!!!")
>> }
>> 
>> func produceTaskOfType1ToChan(wg *sync.WaitGroup, autoCancelChan chan 
>> string) {
>>     defer wg.Done()
>>     // can produce random number of task on autoCancelChan
>>     autoCancelChan <- "task of type of 1"
>> }
>> 
>> func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan 
>> string) {
>>     defer wg.Done()
>>     // can produce random number of task on autoCancelChan
>>     autoCancelChan <- "task of type of 2"
>> }
>> 
>> func kafkaProducerWorkers(autoCancelChan chan string, workerWg 
>> *sync.WaitGroup) {
>>     workerWg.Add(workerCount)
>>     for i := 0; i < workerCount; i++ {
>>         go produceToKafka(autoCancelChan, workerWg)
>>     }
>> }
>> 
>> func produceToKafka(autoCancelChan chan string, workerWg *sync.WaitGroup) {
>>     defer workerWg.Done()
>> 
>>     // for loop will terminate once autoCancelChan is closed
>>     for autoCancel := range autoCancelChan {
>>         KafkaClient.PublishToKafkaTopic(autoCancel)
>>     }
>> }
>> Any improvement you can suggest to this code?
>> 
>> 
>> 
>> 
>> 
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "golang-nuts" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to golang-nuts+unsubscr...@googlegroups.com 
>> <mailto:golang-nuts+unsubscr...@googlegroups.com>.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/golang-nuts/CANFuhy8qBjooo_tB_gT0f%3DTE4DaOFqWL5SWwNghy%2BL-eV82KdA%40mail.gmail.com
>>  
>> <https://groups.google.com/d/msgid/golang-nuts/CANFuhy8qBjooo_tB_gT0f%3DTE4DaOFqWL5SWwNghy%2BL-eV82KdA%40mail.gmail.com?utm_medium=email&utm_source=footer>.
> 
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to golang-nuts+unsubscr...@googlegroups.com 
> <mailto:golang-nuts+unsubscr...@googlegroups.com>.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/golang-nuts/CANFuhy_AOfx9Vpq67tbwUJQhgfCi6Wmnsfu%2BFJ1yS%3DyyBFptkQ%40mail.gmail.com
>  
> <https://groups.google.com/d/msgid/golang-nuts/CANFuhy_AOfx9Vpq67tbwUJQhgfCi6Wmnsfu%2BFJ1yS%3DyyBFptkQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/B4B02FF3-FC60-496F-A4B2-9CC436C068C7%40ix.netcom.com.

Reply via email to