sunlc210 opened a new issue #76: Transfer data is blocked by flush URL: https://github.com/apache/pulsar-client-go/issues/76 To pump data buy filebeat to pulsar cluster , by using go client . Found that ,after sending message to pulsar successfully, there will be blocked by using flush function. the code is below : func (p *partitionProducer) internalFlush(fr *flushRequest) { p.internalFlushCurrentBatch() pi, ok := p.pendingQueue.PeekLast().(*pendingItem) if !ok { p.log.Error("internalFlush block ,Flush error") fr.waitGroup.Done() return } p.log.Info("InternalFlush corrctly: ") pi.sendRequests = append(pi.sendRequests, &sendRequest{ msg: nil, callback: func(id MessageID, message *ProducerMessage, e error) { fr.err = e fr.waitGroup.Done() }, }) } func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) error { wg := sync.WaitGroup{} wg.Add(1) var err error p.internalSendAsync(ctx, msg, func(ID MessageID, message *ProducerMessage, e error) { err = e wg.Done() }, true) // When sending synchronously we flush immediately to avoid // the increased latency and reduced throughput of batching p.log.Info("-------------------beginning flsuh ----------------") if err = p.Flush(); err != nil { return err } p.log.Info("-------------------end flsuh ----------------") wg.Wait() return err } ’ log as below time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="-------------------beginning flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="-------------------beginning flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="-------------------beginning flsuh ----------------" name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0" the problem is occured on below section p.log.Info("InternalFlush corrctly: ") pi.sendRequests = append(pi.sendRequests, &sendRequest{ msg: nil, callback: func(id MessageID, message *ProducerMessage, e error) { fr.err = e fr.waitGroup.Done() }, }) no idea the logic ,but make sure the error is occur on it.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services