I think you are overcomplicating this a bit. It seems like a simple pattern 
of broadcasting a change to multiple agents. You send the change over a 
REQ-REP pair, and broadcast it to others over a PUB-SUB pair.

Why do you need to copy the struct again ? Just get the struct from the REP 
socket and push it to the PUB socket ?

Here is what I would do  -

Use protobuf to marshal/unmarshal the config struct when sending/receiving 
it through the socket. That way you have perfect interop between Go and 
Python.

And use a for-select pattern to do the job of receiving and broadcasting. 
Pseudo code as follows

package main

func main() {
// setup REP socket in a goroutine, unmarshal the msg, send the struct to 
updateChan
// create PUB socket, store it in mem somewhere

go run()

        // setup signal handlers, and send signal to done chan when ctrl-c 
is received.
}

func run() {
for {
select {
case msg := <-updateChan:
// push msg to the PUB socket
case <-done:
// you have to send signal to this from main()
return
}
}
}

Isn't this what you are trying to do ?

 

On Wednesday, 26 September 2018 00:09:07 UTC+5:30, Michael Ellis wrote:
>
> Hi, new gopher here. 
> I considered asking this on SO, but they (rightly, IMO) discourage "Is 
> this a good way to do it?" questions.  Hope that's ok here.
>
> By way of background, I'm porting a largish industrial control application 
> from Python to Go.  The Python version uses multiple processes (about a 
> dozen in all) communicating over ZeroMQ.  One process, called the 
> statehouse,  controls access to the application state.  The others obtain 
> copies and send updates over REQ sockets.  The data are serialized as JSON 
> objects that map nicely to Python dicts.
>
> Since there's no direct equivalent in Go to a Python dict that can hold a 
> mixture of arbitrary types,  I need to use a struct to represent the state. 
> No problem with that but I've been struggling with how to allow the 
> goroutines that will replace the Python processes to read and write to the 
> state struct with concurrency safety.  
>
> This morning I came up with an idea to send functions over a channel to 
> the main routine.  I put together a little test program and after some 
> refinements it looks promising.  Some rough benchmarking shows I can get a 
> million updates in under 1 second on a 2012 vintage Mac Mini.  That's more 
> than good enough for this application where the time between events is 
> usually more than 100 milliseconds.
>
> Here's the link to my test on the Go Playground: 
> https://play.golang.org/p/8iWvwnqBNYl . It runs there except that the 
> elapsed time comes back 0 and the prints from the second goroutine don't 
> show up. I think that's got something to do with the artificial clock in 
> the playground.  It works fine when I run it locally.  I've pasted the code 
> at the bottom of this message.
>
> So my big questions are:
>
>    - Is this actually concurrency safe as long as all goroutines only use 
>    the update mechanism to read and write?
>    - Is there a more idiomatic way to do it that performs as well or 
>    better?
>    - What are the potential problems if this is scaled to a couple dozen 
>    goroutines?
>    - Does it sacrifice clarity for cleverness? (not that it's all that 
>    clever, mind you, but I need to think about handing this off to my 
> client's 
>    staff.)
>
>
> Thanks very much,
> Mike Ellis
>
> code follows ... 
>
> package main
>
> import (
>  "fmt"
>  "time"
> )
>
>
> // Big defines the application's state variables
> type Big struct {
>  A int
>  B string
>  /* and hundreds more */
> }
>
>
> // update is a struct that contains a function that updates a Big and
> // a signal channel to be closed when the update is complete. An update
> // may also be used to obtain a current copy of a Big by coding f to
> // do so.  (See gopher2 below.)
> type update struct {
>  done chan struct{}
>  f    func(*Big)
> }
>
>
> // upch is a channel from which main receives updates.
> var upch = make(chan update)
>
>
> // gopher defines a function that updates a member of a Big and
> // sends updates via upch. After each send it waits for main to
> // close the update's done channel.
> func gopher() {
>  var newA int
>  f := func(b *Big) {
>  b.A = newA
>  }
>  for i := 0; i < n; i++ {
>  newA = i
>  u := update{make(chan struct{}), f}
>  upch <- u
>  <-u.done
>  }
> }
>
>
> // gopher2 uses an update struct to obtain a current copy of a Big
> // every 100 microseconds.
> func gopher2() {
>  var copied Big
>  f := func(b *Big) {
>  copied = *b
>  }
>  for {
>  time.Sleep(100 * time.Microsecond)
>  u := update{make(chan struct{}), f}
>  upch <- u
>  <-u.done
>  fmt.Println(copied)
>  }
> }
>
>
> // main creates a Big, launches gopher and waits on the update channel. 
> When
> // an update, u, arrives it runs u.f and then closes u.done.
> func main() {
>  var state = Big{-1, "foo"}
>  fmt.Println(state) // --> {-1, "foo"}
>  go gopher()
>  go gopher2()
>  start := time.Now()
>  for i := 0; i < n; i++ {
>  u := <-upch
>  u.f(&state)
>  close(u.done)
>  }
>  perUpdate := time.Since(start).Nanoseconds() / int64(n) // Note: always 
> 0 in playground
>  fmt.Printf("%d updates, %d ns per update.\n", n, perUpdate)
>  fmt.Println(state) // --> {n-1, "foo"}
> }
>
>
> var n = 1000 // number of updates to send and receive
>
>
>
>  
>

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to