On Thu, Sep 10, 2020 at 4:43 PM Alexander Mills
<alexander.d.mi...@gmail.com> wrote:
>
> It was behaving strangely before adding the waitgroup, my script was
> getting stuck

Waitgroup is unnecessary. There's a few problems: when you detect an
error you should stop writing, but the program "continue"s. You set
the content-type to json, but this is technically not valid json.

If it was getting stuck, then it may help to find out where it is getting stuck.

>
> On Thu, Sep 10, 2020 at 11:14 AM burak serdar <bser...@computer.org> wrote:
> >
> > On Thu, Sep 10, 2020 at 11:46 AM Alexander Mills
> > <alexander.d.mi...@gmail.com> wrote:
> > >
> > > I tried writing this http post request which streams the request body 
> > > line-by-line as JSON.
> > > (This is for an ElasticSearch bulk api request, newline separate JSON in 
> > > the request body). I didn't think the waitgroup was necessary, but 
> > > apparently the waitgroup is necessary. Does anyone know why? I assume the 
> > > need for the waitgroup might represent a bug in my code.
> >
> >
> > What makes you think waitgroup is necessary?
> >
> >
> >
> > >
> > > Here is the same code in a github gist: 
> > > https://gist.github.com/ORESoftware/06341057ca39e1b2a47dc8a8e4e4b55f
> > >
> > >
> > > func doBulkUpsertWithResponse(baseUrl string, z []UpsertDoc) 
> > > (*map[string]interface{}, error) {
> > >
> > > var m *map[string]interface{}
> > >
> > > if len(z) < 1 {
> > > return m, nil
> > > }
> > >
> > > rd, wr := io.Pipe() // we stream json lines to post request
> > >
> > > wg := sync.WaitGroup{}
> > >
> > > wg.Add(1)
> > > go func() {
> > >
> > > defer func() {
> > >     // close the stream after writing is finished
> > >     wg.Done()
> > >     wr.Close();
> > > }()
> > >
> > > for _, v := range z {
> > >
> > > var doc = ElasticBulkUpsertable{
> > > &v.Doc, true,
> > > }
> > >
> > > jsonStr, err := json.Marshal(&doc)
> > >
> > > if err != nil {
> > > continue
> > > }
> > >
> > > updateLine := fmt.Sprintf(`{"update":{"_id":"%s"}}`, v.Id) + "\n"
> > > if _, err := wr.Write([]byte(updateLine)); err != nil {
> > >   continue
> > > }
> > >
> > > docLine := string(jsonStr) + "\n"
> > > if _, err := wr.Write([]byte(docLine)); err != nil {
> > >     continue
> > > }
> > > }
> > >
> > > }()
> > >
> > > fullUrl := fmt.Sprintf("%s/%s", baseUrl, "_bulk")
> > > req, err := http.NewRequest("POST", fullUrl, rd)
> > >
> > > if err != nil {
> > > log.Println(fmt.Errorf("cm:3ec045d3-4790-4d7a-aab8-65949933d263: '%v'", 
> > > err))
> > > return nil, err
> > > }
> > >
> > > req.Header.Set("Content-Type", "application/json")
> > >
> > > resp, err := client.Do(req)
> > >
> > > wg.Wait()
> > >
> > > if err != nil {
> > > return m, err
> > > }
> > >
> > > defer func() {
> > >     resp.Body.Close()
> > > }()
> > >
> > > if resp.StatusCode != 200 {
> > >   return m, errors.New("request failed with status code: " + resp.Status)
> > > }
> > >
> > > if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
> > >     return nil, err
> > > }
> > >
> > > return m, nil
> > >
> > > }
> > >
> > > --
> > > You received this message because you are subscribed to the Google Groups 
> > > "golang-nuts" group.
> > > To unsubscribe from this group and stop receiving emails from it, send an 
> > > email to golang-nuts+unsubscr...@googlegroups.com.
> > > To view this discussion on the web visit 
> > > https://groups.google.com/d/msgid/golang-nuts/4d6b60cd-4b9e-4bce-aa6f-2d2bfbefbd98n%40googlegroups.com.
>
>
>
> --
> Alexander D. Mills
> New cell phone # (415)730-1805
> linkedin.com/in/alexanderdmills

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/CAMV2Rqo5ZJZOz32m%3DJ0iHdWLM4kHhqc0ECwZ6hXV1EzT9%3DgqZg%40mail.gmail.com.

Reply via email to