This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git


The following commit(s) were added to refs/heads/main by this push:
     new aa94837  Support pprof profiling (#232)
aa94837 is described below

commit aa948377ecdb4724fad1cc365c13a1188021316f
Author: Jingyi Qu <[email protected]>
AuthorDate: Wed Sep 24 22:54:16 2025 +0800

    Support pprof profiling (#232)
---
 CHANGES.md                                        |   1 +
 agent/core/compile.go                             |   5 +
 agent/reporter/imports.go                         |   1 +
 plugins/core/pprof.go                             | 246 +++++++++++++++
 plugins/core/reporter/grpc/grpc.go                |  28 +-
 plugins/core/reporter/pprof_manager.go            | 349 ++++++++++++++++++++++
 test/plugins/scenarios/logrus/config/excepted.yml |  15 +-
 test/plugins/scenarios/zap/config/excepted.yml    |  15 +-
 tools/go-agent/config/agent.default.yaml          |   6 +
 tools/go-agent/config/loader.go                   |  16 +-
 tools/go-agent/instrument/reporter/instrument.go  |  28 +-
 11 files changed, 678 insertions(+), 32 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 10524a6..0825750 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -11,6 +11,7 @@ Release Notes.
 * Add recover to goroutine to prevent unexpected panics.
 * Add mutex to fix some data race. 
 * Replace external `goapi` dependency with in-repo generated protocols. 
+* Support pprof profiling. 
 #### Plugins
 
 #### Documentation
diff --git a/agent/core/compile.go b/agent/core/compile.go
index c511ac0..baefdb1 100644
--- a/agent/core/compile.go
+++ b/agent/core/compile.go
@@ -19,17 +19,21 @@ package core
 
 import (
        //go:nolint
+       _ "bytes"
        _ "encoding/base64"
        _ "fmt"
+       _ "io"
        _ "log"
        _ "math"
        _ "math/rand"
        _ "net"
        _ "os"
+       _ "path/filepath"
        _ "reflect"
        _ "runtime"
        _ "runtime/debug"
        _ "runtime/metrics"
+       _ "runtime/pprof"
        _ "sort"
        _ "strconv"
        _ "strings"
@@ -55,4 +59,5 @@ import (
        _ "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
        _ 
"github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
        _ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
+       _ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
 )
diff --git a/agent/reporter/imports.go b/agent/reporter/imports.go
index e1c6fd6..90c7e8a 100644
--- a/agent/reporter/imports.go
+++ b/agent/reporter/imports.go
@@ -71,5 +71,6 @@ import (
        _ 
"github.com/apache/skywalking-go/protocols/collect/language/profile/v3"
        _ "github.com/apache/skywalking-go/protocols/collect/logging/v3"
        _ "github.com/apache/skywalking-go/protocols/collect/management/v3"
+       _ "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
        _ "github.com/apache/skywalking-go/protocols/collect/servicemesh/v3"
 )
diff --git a/plugins/core/pprof.go b/plugins/core/pprof.go
new file mode 100644
index 0000000..8c83689
--- /dev/null
+++ b/plugins/core/pprof.go
@@ -0,0 +1,246 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "os"
+       "path/filepath"
+       "runtime"
+       "runtime/pprof"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/reporter"
+)
+
+const (
+       // Pprof event types
+       PprofEventsTypeCPU       = "cpu"
+       PprofEventsTypeHeap      = "heap"
+       PprofEventsTypeAllocs    = "allocs"
+       PprofEventsTypeBlock     = "block"
+       PprofEventsTypeMutex     = "mutex"
+       PprofEventsTypeThread    = "threadcreate"
+       PprofEventsTypeGoroutine = "goroutine"
+)
+
+// CPU profiling state to ensure only one CPU profiling task runs at a time
+var profilingIsRunning atomic.Bool
+
+func init() {
+       reporter.NewPprofTaskCommand = NewPprofTaskCommand
+}
+
+type PprofTaskCommandImpl struct {
+       // Pprof Task ID
+       taskID string
+       // Type of profiling 
(CPU/Heap/Block/Mutex/Goroutine/Threadcreate/Allocs)
+       events string
+       // Unit is minute, required for CPU, Block and Mutex events
+       duration time.Duration
+       // Unix timestamp in milliseconds when the task was created
+       createTime int64
+       // Define the period of the pprof dump, required for Block and Mutex 
events
+       dumpPeriod int
+
+       // for pprof task service
+       pprofFilePath string
+       logger        operator.LogOperator
+       manager       reporter.PprofReporter
+}
+
+func NewPprofTaskCommand(taskID, events string, duration time.Duration,
+       createTime int64, dumpPeriod int, pprofFilePath string,
+       logger operator.LogOperator, manager reporter.PprofReporter) 
reporter.PprofTaskCommand {
+       return &PprofTaskCommandImpl{
+               taskID:        taskID,
+               events:        events,
+               duration:      duration,
+               createTime:    createTime,
+               dumpPeriod:    dumpPeriod,
+               pprofFilePath: pprofFilePath,
+               logger:        logger,
+               manager:       manager,
+       }
+}
+
+func (c *PprofTaskCommandImpl) GetTaskID() string {
+       return c.taskID
+}
+
+func (c *PprofTaskCommandImpl) GetCreateTime() int64 {
+       return c.createTime
+}
+
+func (c *PprofTaskCommandImpl) GetDuration() time.Duration {
+       return c.duration
+}
+
+func (c *PprofTaskCommandImpl) GetDumpPeriod() int {
+       return c.dumpPeriod
+}
+
+func (c *PprofTaskCommandImpl) IsInvalidEvent() bool {
+       return !(c.events == PprofEventsTypeHeap ||
+               c.events == PprofEventsTypeAllocs ||
+               c.events == PprofEventsTypeGoroutine ||
+               c.events == PprofEventsTypeThread ||
+               c.events == PprofEventsTypeCPU ||
+               c.events == PprofEventsTypeBlock ||
+               c.events == PprofEventsTypeMutex)
+}
+
+func (c *PprofTaskCommandImpl) IsDirectSamplingType() bool {
+       return c.events == PprofEventsTypeHeap ||
+               c.events == PprofEventsTypeAllocs ||
+               c.events == PprofEventsTypeGoroutine ||
+               c.events == PprofEventsTypeThread
+}
+
+func (c *PprofTaskCommandImpl) HasDumpPeriod() bool {
+       return c.events == PprofEventsTypeBlock ||
+               c.events == PprofEventsTypeMutex
+}
+
+func (c *PprofTaskCommandImpl) closeFileWriter(writer io.Writer) {
+       if file, ok := writer.(*os.File); ok {
+               if err := file.Close(); err != nil {
+                       c.logger.Errorf("failed to close pprof file: %v", err)
+               }
+       }
+}
+
+func (c *PprofTaskCommandImpl) getWriter() (io.Writer, error) {
+       // sample data to buffer
+       if c.pprofFilePath == "" {
+               return &bytes.Buffer{}, nil
+       }
+
+       // sample data to file
+       pprofFileName := filepath.Join(c.taskID, ".pprof")
+       pprofFilePath := filepath.Join(c.pprofFilePath, pprofFileName)
+       if err := os.MkdirAll(filepath.Dir(pprofFilePath), os.ModePerm); err != 
nil {
+               return nil, err
+       }
+
+       writer, err := os.Create(pprofFilePath)
+       if err != nil {
+               return nil, err
+       }
+
+       return writer, nil
+}
+
+func (c *PprofTaskCommandImpl) StartTask() (io.Writer, error) {
+       c.logger.Infof("start pprof task %s", c.taskID)
+       // For CPU profiling, check global state first
+       if c.events == PprofEventsTypeCPU && 
!profilingIsRunning.CompareAndSwap(false, true) {
+               return nil, fmt.Errorf("CPU profiling is already running")
+       }
+
+       writer, err := c.getWriter()
+       if err != nil {
+               if c.events == PprofEventsTypeCPU {
+                       profilingIsRunning.Store(false)
+               }
+               return nil, err
+       }
+
+       switch c.events {
+       case PprofEventsTypeCPU:
+               if err = pprof.StartCPUProfile(writer); err != nil {
+                       profilingIsRunning.Store(false)
+                       if c.pprofFilePath != "" {
+                               c.closeFileWriter(writer)
+                       }
+                       return nil, err
+               }
+       case PprofEventsTypeBlock:
+               runtime.SetBlockProfileRate(c.dumpPeriod)
+       case PprofEventsTypeMutex:
+               runtime.SetMutexProfileFraction(c.dumpPeriod)
+       }
+
+       return writer, nil
+}
+
+func (c *PprofTaskCommandImpl) StopTask(writer io.Writer) {
+       c.logger.Infof("stop pprof task %s", c.taskID)
+       switch c.events {
+       case PprofEventsTypeCPU:
+               pprof.StopCPUProfile()
+               profilingIsRunning.Store(false)
+       case PprofEventsTypeBlock:
+               if err := pprof.Lookup("block").WriteTo(writer, 0); err != nil {
+                       c.logger.Errorf("write Block profile error %v", err)
+               }
+               runtime.SetBlockProfileRate(0)
+       case PprofEventsTypeMutex:
+               if err := pprof.Lookup("mutex").WriteTo(writer, 0); err != nil {
+                       c.logger.Errorf("write Mutex profile error %v", err)
+               }
+               runtime.SetMutexProfileFraction(0)
+       case PprofEventsTypeHeap:
+               if err := pprof.Lookup("heap").WriteTo(writer, 0); err != nil {
+                       c.logger.Errorf("write Heap profile error %v", err)
+               }
+       case PprofEventsTypeAllocs:
+               if err := pprof.Lookup("allocs").WriteTo(writer, 0); err != nil 
{
+                       c.logger.Errorf("write Alloc profile error %v", err)
+               }
+       case PprofEventsTypeGoroutine:
+               if err := pprof.Lookup("goroutine").WriteTo(writer, 0); err != 
nil {
+                       c.logger.Errorf("write Goroutine profile error %v", err)
+               }
+       case PprofEventsTypeThread:
+               if err := pprof.Lookup("threadcreate").WriteTo(writer, 0); err 
!= nil {
+                       c.logger.Errorf("write Thread profile error %v", err)
+               }
+       }
+
+       if c.pprofFilePath != "" {
+               c.closeFileWriter(writer)
+       }
+       c.readPprofData(c.taskID, writer)
+}
+
+func (c *PprofTaskCommandImpl) readPprofData(taskID string, writer io.Writer) {
+       var data []byte
+       if c.pprofFilePath == "" {
+               if buf, ok := writer.(*bytes.Buffer); ok {
+                       data = buf.Bytes()
+               }
+       } else {
+               if file, ok := writer.(*os.File); ok {
+                       filePath := file.Name()
+                       fileData, err := os.ReadFile(filePath)
+                       if err != nil {
+                               c.logger.Errorf("failed to read pprof file %s: 
%v", filePath, err)
+                       }
+                       data = fileData
+                       if err := os.Remove(filePath); err != nil {
+                               c.logger.Errorf("failed to remove pprof file 
%s: %v", filePath, err)
+                       }
+               }
+       }
+       c.manager.ReportPprof(taskID, data)
+}
diff --git a/plugins/core/reporter/grpc/grpc.go 
b/plugins/core/reporter/grpc/grpc.go
index 6734ea7..9d2ba5d 100644
--- a/plugins/core/reporter/grpc/grpc.go
+++ b/plugins/core/reporter/grpc/grpc.go
@@ -42,17 +42,19 @@ func NewGRPCReporter(logger operator.LogOperator,
        checkInterval time.Duration,
        connManager *reporter.ConnectionManager,
        cdsManager *reporter.CDSManager,
+       pprofTaskManager *reporter.PprofTaskManager,
        opts ...ReporterOption,
 ) (reporter.Reporter, error) {
        r := &gRPCReporter{
-               logger:        logger,
-               serverAddr:    serverAddr,
-               tracingSendCh: make(chan *agentv3.SegmentObject, 
maxSendQueueSize),
-               metricsSendCh: make(chan []*agentv3.MeterData, 
maxSendQueueSize),
-               logSendCh:     make(chan *logv3.LogData, maxSendQueueSize),
-               checkInterval: checkInterval,
-               connManager:   connManager,
-               cdsManager:    cdsManager,
+               logger:           logger,
+               serverAddr:       serverAddr,
+               tracingSendCh:    make(chan *agentv3.SegmentObject, 
maxSendQueueSize),
+               metricsSendCh:    make(chan []*agentv3.MeterData, 
maxSendQueueSize),
+               logSendCh:        make(chan *logv3.LogData, maxSendQueueSize),
+               checkInterval:    checkInterval,
+               connManager:      connManager,
+               cdsManager:       cdsManager,
+               pprofTaskManager: pprofTaskManager,
        }
        for _, o := range opts {
                o(r)
@@ -83,10 +85,11 @@ type gRPCReporter struct {
        checkInterval    time.Duration
 
        // bootFlag is set if Boot be executed
-       bootFlag    bool
-       transform   *reporter.Transform
-       connManager *reporter.ConnectionManager
-       cdsManager  *reporter.CDSManager
+       bootFlag         bool
+       transform        *reporter.Transform
+       connManager      *reporter.ConnectionManager
+       cdsManager       *reporter.CDSManager
+       pprofTaskManager *reporter.PprofTaskManager
 }
 
 func (r *gRPCReporter) Boot(entity *reporter.Entity, cdsWatchers 
[]reporter.AgentConfigChangeWatcher) {
@@ -95,6 +98,7 @@ func (r *gRPCReporter) Boot(entity *reporter.Entity, 
cdsWatchers []reporter.Agen
        r.initSendPipeline()
        r.check()
        r.cdsManager.InitCDS(entity, cdsWatchers)
+       r.pprofTaskManager.InitPprofTask(entity)
        r.bootFlag = true
 }
 
diff --git a/plugins/core/reporter/pprof_manager.go 
b/plugins/core/reporter/pprof_manager.go
new file mode 100644
index 0000000..e3d5d9d
--- /dev/null
+++ b/plugins/core/reporter/pprof_manager.go
@@ -0,0 +1,349 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reporter
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "strconv"
+       "time"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
+       pprofv10 "github.com/apache/skywalking-go/protocols/collect/pprof/v10"
+)
+
+const (
+       // max chunk size for pprof data
+       maxChunkSize = 1 * 1024 * 1024
+       // max send queue size for pprof data
+       maxPprofSendQueueSize = 30000
+       // max duration for pprof task
+       pprofTaskDurationMaxMinute = 15 * time.Minute
+)
+
+type PprofTaskCommand interface {
+       GetTaskID() string
+       GetCreateTime() int64
+       GetDuration() time.Duration
+       GetDumpPeriod() int
+       StartTask() (io.Writer, error)
+       StopTask(io.Writer)
+       IsDirectSamplingType() bool
+       IsInvalidEvent() bool
+       HasDumpPeriod() bool
+}
+type PprofReporter interface {
+       ReportPprof(taskID string, content []byte)
+}
+
+var NewPprofTaskCommand func(taskID, events string, duration time.Duration,
+       createTime int64, dumpPeriod int, pprofFilePath string,
+       logger operator.LogOperator, manager PprofReporter) PprofTaskCommand
+
+type PprofTaskManager struct {
+       logger         operator.LogOperator
+       serverAddr     string
+       pprofInterval  time.Duration
+       PprofClient    pprofv10.PprofTaskClient // for grpc
+       connManager    *ConnectionManager
+       entity         *Entity
+       pprofFilePath  string
+       LastUpdateTime int64
+       commands       PprofTaskCommand
+       pprofSendCh    chan *pprofv10.PprofData
+}
+
+func NewPprofTaskManager(logger operator.LogOperator, serverAddr string,
+       pprofInterval time.Duration, connManager *ConnectionManager,
+       pprofFilePath string) (*PprofTaskManager, error) {
+       if pprofInterval <= 0 {
+               logger.Errorf("pprof interval less than zero, pprof profiling 
is disabled")
+               return nil, fmt.Errorf("pprof interval less than zero, pprof 
profiling is disabled")
+       }
+       pprofManager := &PprofTaskManager{
+               logger:        logger,
+               serverAddr:    serverAddr,
+               pprofInterval: pprofInterval,
+               connManager:   connManager,
+               pprofFilePath: pprofFilePath,
+               pprofSendCh:   make(chan *pprofv10.PprofData, 
maxPprofSendQueueSize),
+       }
+       conn, err := connManager.GetConnection(serverAddr)
+       if err != nil {
+               return nil, err
+       }
+       pprofManager.PprofClient = pprofv10.NewPprofTaskClient(conn)
+       pprofManager.commands = nil
+       return pprofManager, nil
+}
+
+func (r *PprofTaskManager) InitPprofTask(entity *Entity) {
+       r.entity = entity
+       r.initPprofSendPipeline()
+       go func() {
+               for {
+                       switch r.connManager.GetConnectionStatus(r.serverAddr) {
+                       case ConnectionStatusShutdown:
+                               return
+                       case ConnectionStatusDisconnect:
+                               time.Sleep(r.pprofInterval)
+                               continue
+                       }
+                       pprofCommand, err := 
r.PprofClient.GetPprofTaskCommands(context.Background(), 
&pprofv10.PprofTaskCommandQuery{
+                               Service:         r.entity.ServiceName,
+                               ServiceInstance: r.entity.ServiceInstanceName,
+                               LastCommandTime: r.LastUpdateTime,
+                       })
+                       if err != nil {
+                               r.logger.Errorf("fetch pprof task commands 
error %v", err)
+                               time.Sleep(r.pprofInterval)
+                               continue
+                       }
+
+                       if len(pprofCommand.GetCommands()) > 0 && 
pprofCommand.GetCommands()[0].Command == "PprofTaskQuery" {
+                               rawCommand := pprofCommand.GetCommands()[0]
+                               r.HandleCommand(rawCommand)
+                       }
+
+                       time.Sleep(r.pprofInterval)
+               }
+       }()
+}
+
+func (r *PprofTaskManager) HandleCommand(rawCommand *commonv3.Command) {
+       command := r.deserializePprofTaskCommand(rawCommand)
+       if command.GetCreateTime() > r.LastUpdateTime {
+               r.LastUpdateTime = command.GetCreateTime()
+       } else {
+               return
+       }
+       if err := r.checkCommand(command); err != nil {
+               r.logger.Errorf("check command error, cannot process this pprof 
task. reason: %v", err)
+               return
+       }
+
+       if command.IsDirectSamplingType() {
+               // direct sampling of Heap, Allocs, Goroutine, Thread
+               writer, err := command.StartTask()
+               if err != nil {
+                       r.logger.Errorf("start %s pprof task error %v \n", 
command.GetTaskID(), err)
+                       return
+               }
+               command.StopTask(writer)
+       } else {
+               // The CPU, Block and Mutex sampling lasts for a duration and 
then stops
+               writer, err := command.StartTask()
+               if err != nil {
+                       r.logger.Errorf("start %s pprof task error %v \n", 
command.GetTaskID(), err)
+                       return
+               }
+               time.AfterFunc(command.GetDuration(), func() {
+                       command.StopTask(writer)
+               })
+       }
+}
+
+func (r *PprofTaskManager) deserializePprofTaskCommand(command 
*commonv3.Command) PprofTaskCommand {
+       args := command.Args
+       taskID := ""
+       events := ""
+       duration := 0
+       dumpPeriod := 0 // Use -1 to indicate no explicit value provided
+       var createTime int64 = 0
+       for _, pair := range args {
+               if pair.GetKey() == "TaskId" {
+                       taskID = pair.GetValue()
+               } else if pair.GetKey() == "Events" {
+                       events = pair.GetValue()
+               } else if pair.GetKey() == "Duration" {
+                       if val, err := strconv.Atoi(pair.GetValue()); err == 
nil && val > 0 {
+                               duration = val
+                       }
+               } else if pair.GetKey() == "DumpPeriod" {
+                       if val, err := strconv.Atoi(pair.GetValue()); err == 
nil && val >= 0 {
+                               dumpPeriod = val
+                       }
+               } else if pair.GetKey() == "CreateTime" {
+                       createTime, _ = strconv.ParseInt(pair.GetValue(), 10, 
64)
+               }
+       }
+
+       return NewPprofTaskCommand(
+               taskID,
+               events,
+               time.Duration(duration)*time.Minute,
+               createTime,
+               dumpPeriod,
+               r.pprofFilePath,
+               r.logger,
+               r,
+       )
+}
+
+func (r *PprofTaskManager) checkCommand(command PprofTaskCommand) error {
+       if command.GetTaskID() == "" {
+               return fmt.Errorf("pprof task id cannot be empty, task id is 
%s", command.GetTaskID())
+       }
+       if command.IsInvalidEvent() {
+               return fmt.Errorf("pprof task event is invalid, task id is %s", 
command.GetTaskID())
+       }
+       if !command.IsDirectSamplingType() {
+               if command.GetDuration() <= 0 || command.GetDuration() > 
pprofTaskDurationMaxMinute {
+                       return fmt.Errorf("pprof task duration must be between 
0 and %v, task id is %s", pprofTaskDurationMaxMinute, command.GetTaskID())
+               }
+       }
+       if command.HasDumpPeriod() && command.GetDumpPeriod() <= 0 {
+               return fmt.Errorf("pprof task dumpperiod must be greater than 
0, task id is %s", command.GetTaskID())
+       }
+       return nil
+}
+
+func (r *PprofTaskManager) ReportPprof(taskID string, content []byte) {
+       metaData := &pprofv10.PprofMetaData{
+               Service:         r.entity.ServiceName,
+               ServiceInstance: r.entity.ServiceInstanceName,
+               TaskId:          taskID,
+               Type:            
pprofv10.PprofProfilingStatus_PPROF_PROFILING_SUCCESS,
+               ContentSize:     int32(len(content)),
+       }
+
+       pprofData := &pprofv10.PprofData{
+               Metadata: metaData,
+               Result: &pprofv10.PprofData_Content{
+                       Content: content,
+               },
+       }
+
+       select {
+       case r.pprofSendCh <- pprofData:
+       default:
+               r.logger.Errorf("reach max pprof send buffer")
+       }
+}
+
+func (r *PprofTaskManager) initPprofSendPipeline() {
+       go func() {
+               defer func() {
+                       if err := recover(); err != nil {
+                               r.logger.Errorf("PprofTaskManager 
initPprofSendPipeline panic err %v", err)
+                       }
+               }()
+       StreamLoop:
+               for {
+                       switch r.connManager.GetConnectionStatus(r.serverAddr) {
+                       case ConnectionStatusShutdown:
+                               return
+                       case ConnectionStatusDisconnect:
+                               time.Sleep(5 * time.Second)
+                               continue StreamLoop
+                       }
+
+                       for pprofData := range r.pprofSendCh {
+                               r.uploadPprofData(pprofData)
+                       }
+                       break
+               }
+       }()
+}
+
+func (r *PprofTaskManager) uploadPprofData(pprofData *pprofv10.PprofData) {
+       ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+       defer cancel()
+
+       stream, err := r.PprofClient.Collect(ctx)
+       if err != nil {
+               r.logger.Errorf("failed to start collect stream: %v", err)
+               return
+       }
+
+       // Send metadata first
+       metadataMsg := &pprofv10.PprofData{
+               Metadata: pprofData.Metadata,
+       }
+       if err = stream.Send(metadataMsg); err != nil {
+               r.logger.Errorf("failed to send metadata: %v", err)
+               return
+       }
+
+       resp, err := stream.Recv()
+       if err != nil {
+               r.logger.Errorf("failed to receive server response: %v", err)
+               return
+       }
+
+       switch resp.Status {
+       case pprofv10.PprofProfilingStatus_PPROF_TERMINATED_BY_OVERSIZE:
+               r.logger.Errorf("pprof is too large to be received by the oap 
server")
+               return
+       case pprofv10.PprofProfilingStatus_PPROF_EXECUTION_TASK_ERROR:
+               r.logger.Errorf("server rejected pprof upload due to execution 
task error")
+               return
+       }
+
+       // Upload content in chunks
+       content := pprofData.GetContent()
+       chunkCount := 0
+       contentSize := len(content)
+
+       for offset := 0; offset < contentSize; offset += maxChunkSize {
+               end := offset + maxChunkSize
+               if end > contentSize {
+                       end = contentSize
+               }
+
+               chunkData := &pprofv10.PprofData{
+                       Result: &pprofv10.PprofData_Content{
+                               Content: content[offset:end],
+                       },
+               }
+
+               if err := stream.Send(chunkData); err != nil {
+                       r.logger.Errorf("failed to send pprof chunk %d: %v", 
chunkCount, err)
+                       return
+               }
+               chunkCount++
+               // Check context timeout
+               select {
+               case <-ctx.Done():
+                       r.logger.Errorf("context timeout during chunk upload 
for task %s", pprofData.Metadata.TaskId)
+                       return
+               default:
+               }
+       }
+
+       r.closePprofStream(stream)
+}
+func (r *PprofTaskManager) closePprofStream(stream 
pprofv10.PprofTask_CollectClient) {
+       if err := stream.CloseSend(); err != nil {
+               r.logger.Errorf("failed to close send stream: %v", err)
+               return
+       }
+
+       for {
+               _, err := stream.Recv()
+               if err == io.EOF {
+                       break
+               }
+               if err != nil {
+                       r.logger.Errorf("error receiving final response %v", 
err)
+                       break
+               }
+       }
+}
diff --git a/test/plugins/scenarios/logrus/config/excepted.yml 
b/test/plugins/scenarios/logrus/config/excepted.yml
index 983c4aa..88bfca1 100644
--- a/test/plugins/scenarios/logrus/config/excepted.yml
+++ b/test/plugins/scenarios/logrus/config/excepted.yml
@@ -18,14 +18,23 @@ segmentItems: []
 meterItems: []
 logItems:
   - serviceName: logrus
-    logSize: ge 3
+    logSize: ge 4
     logs:
       - timestamp: nq 0
         endpoint: ''
         body:
           type: TEXT
-          content: { text: 'fetch dynamic configuration error rpc error: code 
= Unimplemented
-              desc = Method not found: 
skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' }
+          content: { text: not null }
+        traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
+        tags:
+          data:
+            - { key: LEVEL, value: error }
+        layer: GENERAL
+      - timestamp: nq 0
+        endpoint: ''
+        body:
+          type: TEXT
+          content: { text: not null }
         traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
         tags:
           data:
diff --git a/test/plugins/scenarios/zap/config/excepted.yml 
b/test/plugins/scenarios/zap/config/excepted.yml
index 0b927f9..dd1b2d8 100644
--- a/test/plugins/scenarios/zap/config/excepted.yml
+++ b/test/plugins/scenarios/zap/config/excepted.yml
@@ -18,14 +18,23 @@ segmentItems: []
 meterItems: []
 logItems:
   - serviceName: zap
-    logSize: ge 3
+    logSize: ge 4
     logs:
       - timestamp: nq 0
         endpoint: ''
         body:
           type: TEXT
-          content: { text: 'fetch dynamic configuration error rpc error: code 
= Unimplemented
-              desc = Method not found: 
skywalking.v3.ConfigurationDiscoveryService/fetchConfigurations' }
+          content: { text: not null }
+        traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
+        tags:
+          data:
+            - { key: LEVEL, value: error }
+        layer: GENERAL
+      - timestamp: nq 0
+        endpoint: ''
+        body:
+          type: TEXT
+          content: { text: not null }
         traceContext: { traceId: N/A, traceSegmentId: N/A, spanId: -1 }
         tags:
           data:
diff --git a/tools/go-agent/config/agent.default.yaml 
b/tools/go-agent/config/agent.default.yaml
index f07f22d..02d351f 100644
--- a/tools/go-agent/config/agent.default.yaml
+++ b/tools/go-agent/config/agent.default.yaml
@@ -54,6 +54,12 @@ reporter:
     authentication: ${SW_AGENT_REPORTER_GRPC_AUTHENTICATION:}
     # The interval(s) of fetching dynamic configuration from backend.
     cds_fetch_interval: ${SW_AGENT_REPORTER_GRPC_CDS_FETCH_INTERVAL:20}
+    pprof:
+      # The interval(s) of fetching pprof task from backend.
+      pprof_fetch_interval: 
${SW_AGENT_REPORTER_GRPC_PPROF_TASK_FETCH_INTERVAL:20}
+      # The pprof file path generated when executing the profile task.
+      pprof_file_path: ${SW_AGENT_REPORTER_GRPC_PROFILE_PPROF_FILE_PATH:}
+
     tls:
       # Whether to enable TLS with backend.
       enable: ${SW_AGENT_REPORTER_GRPC_TLS_ENABLE:false}
diff --git a/tools/go-agent/config/loader.go b/tools/go-agent/config/loader.go
index adc2a11..8d843df 100644
--- a/tools/go-agent/config/loader.go
+++ b/tools/go-agent/config/loader.go
@@ -83,11 +83,17 @@ type Meter struct {
 }
 
 type GRPCReporter struct {
-       BackendService   StringValue     `yaml:"backend_service"`
-       MaxSendQueue     StringValue     `yaml:"max_send_queue"`
-       Authentication   StringValue     `yaml:"authentication"`
-       CDSFetchInterval StringValue     `yaml:"cds_fetch_interval"`
-       TLS              GRPCReporterTLS `yaml:"tls"`
+       BackendService   StringValue       `yaml:"backend_service"`
+       MaxSendQueue     StringValue       `yaml:"max_send_queue"`
+       Authentication   StringValue       `yaml:"authentication"`
+       CDSFetchInterval StringValue       `yaml:"cds_fetch_interval"`
+       TLS              GRPCReporterTLS   `yaml:"tls"`
+       Pprof            GRPCReporterPprof `yaml:"pprof"`
+}
+
+type GRPCReporterPprof struct {
+       PprofFetchInterval StringValue `yaml:"pprof_fetch_interval"`
+       PprofFilePath      StringValue `yaml:"pprof_file_path"`
 }
 
 type GRPCReporterTLS struct {
diff --git a/tools/go-agent/instrument/reporter/instrument.go 
b/tools/go-agent/instrument/reporter/instrument.go
index 68409e6..43bb06a 100644
--- a/tools/go-agent/instrument/reporter/instrument.go
+++ b/tools/go-agent/instrument/reporter/instrument.go
@@ -160,7 +160,7 @@ func (i *Instrument) generateReporterInitFile(dir, 
reporterType string) (string,
        reporterInitTemplate := baseReporterInitTemplate
        if reporterType == consts.KafkaReporter {
                reporterInitTemplate += `
-       _, cdsManager, err := initManager(logger, checkInterval)
+       _, cdsManager, _, err := initManager(logger, checkInterval)
        if err != nil {
                return nil, err
        }
@@ -169,11 +169,11 @@ func (i *Instrument) generateReporterInitFile(dir, 
reporterType string) (string,
                reporterInitTemplate += kafkaReporterInitFunc
        } else {
                reporterInitTemplate += `
-       connManager, cdsManager, err := initManager(logger, checkInterval)
+       connManager, cdsManager, pprofTaskManager, err := initManager(logger, 
checkInterval)
        if err != nil {
                return nil, err
        }
-       return initGRPCReporter(logger, checkInterval, connManager, cdsManager)
+       return initGRPCReporter(logger, checkInterval, connManager, cdsManager, 
pprofTaskManager)
 }`
                reporterInitTemplate += grpcReporterInitFunc
        }
@@ -208,7 +208,7 @@ func {{.InitFuncName}}(logger operator.LogOperator) 
(Reporter, error) {
 
 const initManagerFunc = `
 
-func initManager(logger operator.LogOperator, checkInterval time.Duration) 
(*ConnectionManager, *CDSManager, error) {
+func initManager(logger operator.LogOperator, checkInterval time.Duration) 
(*ConnectionManager, *CDSManager, *PprofTaskManager, error) {
        authenticationVal := 
{{.Config.Reporter.GRPC.Authentication.ToGoStringValue}}
        backendServiceVal := 
{{.Config.Reporter.GRPC.BackendService.ToGoStringValue}}
 
@@ -229,16 +229,25 @@ func initManager(logger operator.LogOperator, 
checkInterval time.Duration) (*Con
                connManager, err = NewConnectionManager(logger, checkInterval, 
backendServiceVal, authenticationVal, nil)
        }
        if err != nil {
-               return nil, nil, err
+               return nil, nil, nil, err
        }
 
        cdsFetchIntervalVal := 
{{.Config.Reporter.GRPC.CDSFetchInterval.ToGoIntValue "the cds fetch interval 
must be number"}}
        cdsFetchInterval := time.Second * time.Duration(cdsFetchIntervalVal)
        cdsManager, err := NewCDSManager(logger, backendServiceVal, 
cdsFetchInterval, connManager)
        if err != nil {
-               return nil, nil, err
+               return nil, nil, nil, err
        }
-       return connManager, cdsManager, nil
+
+       pprofFetchIntervalVal := 
{{.Config.Reporter.GRPC.Pprof.PprofFetchInterval.ToGoIntValue "the pprof fetch 
interval must be number"}}
+       pprofFetchInterval := time.Second * time.Duration(pprofFetchIntervalVal)
+       pprofFilePath := 
{{.Config.Reporter.GRPC.Pprof.PprofFilePath.ToGoStringValue}}
+       pprofTaskManager, err := NewPprofTaskManager(logger, backendServiceVal, 
pprofFetchInterval, connManager, pprofFilePath)
+       if err != nil {
+               return nil, nil, nil, err
+       }
+
+       return connManager, cdsManager, pprofTaskManager, nil
 }
 `
 
@@ -247,13 +256,14 @@ const grpcReporterInitFunc = `
 func initGRPCReporter(logger operator.LogOperator,
                                        checkInterval time.Duration,
                                        connManager *ConnectionManager,
-                                       cdsManager *CDSManager) (Reporter, 
error) {
+                                       cdsManager *CDSManager,
+                                       pprofTaskManager *PprofTaskManager) 
(Reporter, error) {
        var opts []ReporterOption
        maxSendQueueVal := {{.Config.Reporter.GRPC.MaxSendQueue.ToGoIntValue 
"the GRPC reporter max queue size must be number"}}
        opts = append(opts, WithMaxSendQueueSize(maxSendQueueVal))
 
        backendServiceVal := 
{{.Config.Reporter.GRPC.BackendService.ToGoStringValue}}
-       return NewGRPCReporter(logger, backendServiceVal, checkInterval, 
connManager, cdsManager, opts...)
+       return NewGRPCReporter(logger, backendServiceVal, checkInterval, 
connManager, cdsManager, pprofTaskManager, opts...)
 }
 `
 

Reply via email to