sunlc210 opened a new issue #76: Transfer data is blocked by flush
URL: https://github.com/apache/pulsar-client-go/issues/76
 
 
   To pump data buy filebeat to pulsar cluster , by using go client .
   
   Found that ,after sending message to pulsar successfully, there will be 
blocked by using flush function.
   
   the code is below :
   
   func (p *partitionProducer) internalFlush(fr *flushRequest) {
        p.internalFlushCurrentBatch()
   
        pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
        if !ok {
                p.log.Error("internalFlush block ,Flush error")
                fr.waitGroup.Done()
                return
        }
        p.log.Info("InternalFlush corrctly: ")  
        pi.sendRequests = append(pi.sendRequests, &sendRequest{
                msg: nil,
                callback: func(id MessageID, message *ProducerMessage, e error) 
{
                        fr.err = e
                        fr.waitGroup.Done()
                },
        })
   }
   
   func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) 
error {
        wg := sync.WaitGroup{}
        wg.Add(1)
   
        var err error
   
        p.internalSendAsync(ctx, msg, func(ID MessageID, message 
*ProducerMessage, e error) {
                err = e
                wg.Done()
        }, true)
   
        // When sending synchronously we flush immediately to avoid
        // the increased latency and reduced throughput of batching
        p.log.Info("-------------------beginning flsuh ----------------")
        if err = p.Flush(); err != nil {
                return err
        }
        p.log.Info("-------------------end flsuh ----------------")
        wg.Wait()
        return err
   }
   ’
   
   log as below
   
   time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " 
name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end 
flsuh ----------------" name=dc_jh1 
topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info 
msg="-------------------beginning flsuh ----------------" name=dc_jh1 
topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " 
name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end 
flsuh ----------------" name=dc_jh1 
topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info 
msg="-------------------beginning flsuh ----------------" name=dc_jh1 
topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " 
name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info msg="-------------------end 
flsuh ----------------" name=dc_jh1 
topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info 
msg="-------------------beginning flsuh ----------------" name=dc_jh1 
topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   time="2019-10-25T10:39:53+08:00" level=info msg="InternalFlush corrctly: " 
name=dc_jh1 topic="persistent://crossjhwx/jh2wx/20191024-partition-0"
   
   the problem is occured on below section
   
   p.log.Info("InternalFlush corrctly: ")  
        pi.sendRequests = append(pi.sendRequests, &sendRequest{
                msg: nil,
                callback: func(id MessageID, message *ProducerMessage, e error) 
{
                        fr.err = e
                        fr.waitGroup.Done()
                },
        })
   
   no idea the logic ,but make sure the error is occur on it. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to