mrproliu commented on code in PR #229: URL: https://github.com/apache/skywalking-go/pull/229#discussion_r2366170211
########## .github/workflows/publish-docker.yaml: ########## @@ -17,7 +17,7 @@ name: publish-docker on: - push: + pull_request: Review Comment: Please rollback this change. ########## plugins/core/prof_labels.go: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package core + +import ( + "context" + "runtime" + "runtime/pprof" + "sort" + "unsafe" + + "github.com/apache/skywalking-go/plugins/core/profile" +) + +type label struct { + key string + value string +} + +type LabelSet struct { + list []label +} + +type labelMap struct { + LabelSet +} + +type labelMap19 map[string]string + +//go:linkname runtimeGetProfLabel runtime/pprof.runtime_getProfLabel +func runtimeGetProfLabel() unsafe.Pointer + +func (m *ProfileManager) GetPprofLabelSet(traceID, segmentID string, spanID int32) interface{} { + pl := LabelSet{ + list: make([]label, 0), + } + p := runtimeGetProfLabel() + if p != nil { + version := runtime.Version() + if version < "go1.20" { Review Comment: Why do you compare the Go version as a string? It should be an int type? ########## plugins/core/profile.go: ########## @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package core + +import ( + "runtime/pprof" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/reporter" + common "github.com/apache/skywalking-go/protocols/collect/common/v3" +) + +type profileLabels struct { + labels *LabelSet +} + +const ( + maxSendQueueSize int32 = 100 + ChunkSize = 1024 * 1024 + TraceLabel = "traceID" + SegmentLabel = "traceSegmentID" + MinDurationLabel = "minDurationThreshold" + SpanLabel = "spanID" +) + +type currentTask struct { + serialNumber string // uuid + taskID string + minDurationThreshold int64 + endpointName string + duration int +} + +type ProfileManager struct { + mu sync.Mutex + TraceProfileTasks map[string]*reporter.TraceProfileTask + rawCh chan profileRawData + FinalReportResults chan reporter.ProfileResult + profilingWriter *ProfilingWriter + profileEvents *TraceProfilingEventManager + currentTask *currentTask + Log operator.LogOperator + counter atomic.Int32 +} + +func (m *ProfileManager) initReportChannel() { + // Original channel for receiving raw data chunks sent by the Writer + rawCh := make(chan profileRawData, maxSendQueueSize) + m.rawCh = rawCh + var d []byte + // Start a goroutine to supplement each data chunk with business information + go func() { + for rawResult := range rawCh { + d = append(d, rawResult.data...) + m.mu.Lock() + // Get business information from currentTask + task := m.currentTask + m.mu.Unlock() + + if task == nil { + m.Log.Info("no task") + continue // Task has ended, ignore + } + + if rawResult.isLast { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + m.mu.Lock() + m.TraceProfileTasks[m.currentTask.taskID].Status = reporter.Finished + m.currentTask = nil + m.profileEvents.BaseEventStatus[CurTaskExist] = false + m.mu.Unlock() + } else { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + } + } + }() +} + +func NewProfileManager(log operator.LogOperator) *ProfileManager { + pm := &ProfileManager{ + TraceProfileTasks: make(map[string]*reporter.TraceProfileTask), + FinalReportResults: make(chan reporter.ProfileResult, maxSendQueueSize), + profileEvents: NewEventManager(), + } + pm.RegisterProfileEvents() + + if log == nil { + log = newDefaultLogger() + } + pm.Log = log + pm.initReportChannel() + pm.profilingWriter = NewProfilingWriter( + ChunkSize, + pm.rawCh, + ) + return pm +} + +func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) { + m.mu.Lock() + defer m.mu.Unlock() + var task reporter.TraceProfileTask + for _, arg := range args { + switch arg.Key { + case "TaskID": + task.TaskID = arg.Value + case "EndpointName": + task.EndpointName = arg.Value + case "Duration": + // Duration min + task.Duration = parseInt(arg.Value) + case "MinDurationThreshold": + task.MinDurationThreshold = parseInt64(arg.Value) + case "DumpPeriod": + task.DumpPeriod = parseInt(arg.Value) + case "MaxSamplingCount": + task.MaxSamplingCount = parseInt(arg.Value) + case "StartTime": + task.StartTime = parseInt64(arg.Value) + case "CreateTime": + task.CreateTime = parseInt64(arg.Value) + case "SerialNumber": + task.SerialNumber = arg.Value + } + } + m.Log.Info("adding profile task:", task) + if _, exists := m.TraceProfileTasks[task.TaskID]; exists { + return + } + endTime := task.StartTime + int64(task.Duration)*60*1000 + task.EndTime = endTime + task.Status = reporter.Pending + m.TraceProfileTasks[task.TaskID] = &task + m.TrySetCurrentTask(&task) + m.tryStartCPUProfiling() +} + +func (m *ProfileManager) RemoveProfileTask() { + m.mu.Lock() + defer m.mu.Unlock() + for k, t := range m.TraceProfileTasks { + if t.Status == reporter.Reported || t.EndTime < time.Now().Unix() { + delete(m.TraceProfileTasks, k) + } + } +} + +func (m *ProfileManager) tryStartCPUProfiling() { + ok, err := m.profileEvents.ExecuteComplexEvent(CouldProfile) + if err != nil { + m.Log.Errorf("profile event error:%v", err) + return + } + t := m.TraceProfileTasks[m.currentTask.taskID] Review Comment: Actually, this map will only have one data, so there's no need to use the map, right? ########## plugins/core/profile.go: ########## @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package core + +import ( + "runtime/pprof" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/reporter" + common "github.com/apache/skywalking-go/protocols/collect/common/v3" +) + +type profileLabels struct { + labels *LabelSet +} + +const ( + maxSendQueueSize int32 = 100 + ChunkSize = 1024 * 1024 + TraceLabel = "traceID" + SegmentLabel = "traceSegmentID" + MinDurationLabel = "minDurationThreshold" + SpanLabel = "spanID" +) + +type currentTask struct { + serialNumber string // uuid + taskID string + minDurationThreshold int64 + endpointName string + duration int +} + +type ProfileManager struct { + mu sync.Mutex + TraceProfileTasks map[string]*reporter.TraceProfileTask + rawCh chan profileRawData + FinalReportResults chan reporter.ProfileResult + profilingWriter *ProfilingWriter + profileEvents *TraceProfilingEventManager + currentTask *currentTask + Log operator.LogOperator + counter atomic.Int32 +} + +func (m *ProfileManager) initReportChannel() { + // Original channel for receiving raw data chunks sent by the Writer + rawCh := make(chan profileRawData, maxSendQueueSize) + m.rawCh = rawCh + var d []byte + // Start a goroutine to supplement each data chunk with business information + go func() { + for rawResult := range rawCh { + d = append(d, rawResult.data...) + m.mu.Lock() + // Get business information from currentTask + task := m.currentTask + m.mu.Unlock() + + if task == nil { + m.Log.Info("no task") + continue // Task has ended, ignore + } + + if rawResult.isLast { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + m.mu.Lock() + m.TraceProfileTasks[m.currentTask.taskID].Status = reporter.Finished + m.currentTask = nil + m.profileEvents.BaseEventStatus[CurTaskExist] = false + m.mu.Unlock() + } else { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + } + } + }() +} + +func NewProfileManager(log operator.LogOperator) *ProfileManager { + pm := &ProfileManager{ + TraceProfileTasks: make(map[string]*reporter.TraceProfileTask), + FinalReportResults: make(chan reporter.ProfileResult, maxSendQueueSize), + profileEvents: NewEventManager(), + } + pm.RegisterProfileEvents() + + if log == nil { + log = newDefaultLogger() + } + pm.Log = log + pm.initReportChannel() + pm.profilingWriter = NewProfilingWriter( + ChunkSize, + pm.rawCh, + ) + return pm +} + +func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) { + m.mu.Lock() + defer m.mu.Unlock() + var task reporter.TraceProfileTask + for _, arg := range args { + switch arg.Key { + case "TaskID": + task.TaskID = arg.Value + case "EndpointName": + task.EndpointName = arg.Value + case "Duration": + // Duration min + task.Duration = parseInt(arg.Value) + case "MinDurationThreshold": + task.MinDurationThreshold = parseInt64(arg.Value) + case "DumpPeriod": + task.DumpPeriod = parseInt(arg.Value) + case "MaxSamplingCount": + task.MaxSamplingCount = parseInt(arg.Value) + case "StartTime": + task.StartTime = parseInt64(arg.Value) + case "CreateTime": + task.CreateTime = parseInt64(arg.Value) + case "SerialNumber": + task.SerialNumber = arg.Value + } + } + m.Log.Info("adding profile task:", task) + if _, exists := m.TraceProfileTasks[task.TaskID]; exists { + return + } + endTime := task.StartTime + int64(task.Duration)*60*1000 + task.EndTime = endTime + task.Status = reporter.Pending + m.TraceProfileTasks[task.TaskID] = &task + m.TrySetCurrentTask(&task) + m.tryStartCPUProfiling() +} + +func (m *ProfileManager) RemoveProfileTask() { + m.mu.Lock() + defer m.mu.Unlock() + for k, t := range m.TraceProfileTasks { + if t.Status == reporter.Reported || t.EndTime < time.Now().Unix() { + delete(m.TraceProfileTasks, k) + } + } +} + +func (m *ProfileManager) tryStartCPUProfiling() { + ok, err := m.profileEvents.ExecuteComplexEvent(CouldProfile) + if err != nil { + m.Log.Errorf("profile event error:%v", err) Review Comment: What will happen when the `TrySetCurrentTask` executes failure? Will the start profiling keep running? You can consolidate them all into a single method to simplify the logic of task execution. ########## plugins/core/operator/profiler.go: ########## @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package operator + +type ProfileOperator interface { + GetPprofLabelSet(traceID string, segmentID string, spanID int32) interface{} + TurnToPprofLabel(l interface{}) interface{} Review Comment: Can we simplify the profile operator? We should only set and get the trace info from the goroutine is enough. ########## .github/workflows/plugin-tests.yaml: ########## @@ -21,7 +21,7 @@ concurrency: cancel-in-progress: true on: - pull_request: + push: Review Comment: Please keep this only trigger on the pull request to reduce the CI resource. ########## plugins/core/profile.go: ########## @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package core + +import ( + "runtime/pprof" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/reporter" + common "github.com/apache/skywalking-go/protocols/collect/common/v3" +) + +type profileLabels struct { + labels *LabelSet +} + +const ( + maxSendQueueSize int32 = 100 + ChunkSize = 1024 * 1024 + TraceLabel = "traceID" + SegmentLabel = "traceSegmentID" + MinDurationLabel = "minDurationThreshold" + SpanLabel = "spanID" +) + +type currentTask struct { + serialNumber string // uuid + taskID string + minDurationThreshold int64 + endpointName string + duration int +} + +type ProfileManager struct { + mu sync.Mutex + TraceProfileTasks map[string]*reporter.TraceProfileTask + rawCh chan profileRawData + FinalReportResults chan reporter.ProfileResult + profilingWriter *ProfilingWriter + profileEvents *TraceProfilingEventManager + currentTask *currentTask + Log operator.LogOperator + counter atomic.Int32 +} + +func (m *ProfileManager) initReportChannel() { + // Original channel for receiving raw data chunks sent by the Writer + rawCh := make(chan profileRawData, maxSendQueueSize) + m.rawCh = rawCh + var d []byte + // Start a goroutine to supplement each data chunk with business information + go func() { + for rawResult := range rawCh { + d = append(d, rawResult.data...) + m.mu.Lock() + // Get business information from currentTask + task := m.currentTask + m.mu.Unlock() + + if task == nil { + m.Log.Info("no task") + continue // Task has ended, ignore + } + + if rawResult.isLast { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + m.mu.Lock() + m.TraceProfileTasks[m.currentTask.taskID].Status = reporter.Finished + m.currentTask = nil + m.profileEvents.BaseEventStatus[CurTaskExist] = false + m.mu.Unlock() + } else { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + } + } + }() +} + +func NewProfileManager(log operator.LogOperator) *ProfileManager { + pm := &ProfileManager{ + TraceProfileTasks: make(map[string]*reporter.TraceProfileTask), + FinalReportResults: make(chan reporter.ProfileResult, maxSendQueueSize), + profileEvents: NewEventManager(), + } + pm.RegisterProfileEvents() + + if log == nil { + log = newDefaultLogger() + } + pm.Log = log + pm.initReportChannel() + pm.profilingWriter = NewProfilingWriter( + ChunkSize, + pm.rawCh, + ) + return pm +} + +func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) { + m.mu.Lock() + defer m.mu.Unlock() + var task reporter.TraceProfileTask + for _, arg := range args { + switch arg.Key { + case "TaskID": + task.TaskID = arg.Value + case "EndpointName": + task.EndpointName = arg.Value + case "Duration": + // Duration min + task.Duration = parseInt(arg.Value) + case "MinDurationThreshold": + task.MinDurationThreshold = parseInt64(arg.Value) + case "DumpPeriod": + task.DumpPeriod = parseInt(arg.Value) + case "MaxSamplingCount": + task.MaxSamplingCount = parseInt(arg.Value) + case "StartTime": + task.StartTime = parseInt64(arg.Value) + case "CreateTime": + task.CreateTime = parseInt64(arg.Value) + case "SerialNumber": + task.SerialNumber = arg.Value + } + } + m.Log.Info("adding profile task:", task) + if _, exists := m.TraceProfileTasks[task.TaskID]; exists { + return + } + endTime := task.StartTime + int64(task.Duration)*60*1000 + task.EndTime = endTime + task.Status = reporter.Pending + m.TraceProfileTasks[task.TaskID] = &task + m.TrySetCurrentTask(&task) + m.tryStartCPUProfiling() +} + +func (m *ProfileManager) RemoveProfileTask() { + m.mu.Lock() + defer m.mu.Unlock() + for k, t := range m.TraceProfileTasks { + if t.Status == reporter.Reported || t.EndTime < time.Now().Unix() { + delete(m.TraceProfileTasks, k) + } + } +} + +func (m *ProfileManager) tryStartCPUProfiling() { + ok, err := m.profileEvents.ExecuteComplexEvent(CouldProfile) + if err != nil { + m.Log.Errorf("profile event error:%v", err) + return + } + t := m.TraceProfileTasks[m.currentTask.taskID] + if ok && t.Status == reporter.Pending { + err := pprof.StartCPUProfile(m.profilingWriter) + if err != nil { + m.Log.Info("failed to start cpu profiling", err) + return + } + err = m.profileEvents.UpdateBaseEventStatus(IfProfiling, true) + if err != nil { + m.Log.Errorf("update profile event error:%v", err) + } + t.Status = reporter.Running + go m.monitor() + } +} + +func (m *ProfileManager) CheckIfProfileTarget(endpoint string) bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.currentTask == nil { + return false + } + return m.currentTask.endpointName == endpoint +} + +func (m *ProfileManager) IfProfiling() bool { + return m.profileEvents.BaseEventStatus[IfProfiling] Review Comment: Is this will have a data race? ########## plugins/core/profile.go: ########## @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package core + +import ( + "runtime/pprof" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/reporter" + common "github.com/apache/skywalking-go/protocols/collect/common/v3" +) + +type profileLabels struct { + labels *LabelSet +} + +const ( + maxSendQueueSize int32 = 100 + ChunkSize = 1024 * 1024 + TraceLabel = "traceID" + SegmentLabel = "traceSegmentID" + MinDurationLabel = "minDurationThreshold" + SpanLabel = "spanID" +) + +type currentTask struct { + serialNumber string // uuid + taskID string + minDurationThreshold int64 + endpointName string + duration int +} + +type ProfileManager struct { + mu sync.Mutex + TraceProfileTasks map[string]*reporter.TraceProfileTask + rawCh chan profileRawData + FinalReportResults chan reporter.ProfileResult + profilingWriter *ProfilingWriter + profileEvents *TraceProfilingEventManager + currentTask *currentTask + Log operator.LogOperator + counter atomic.Int32 +} + +func (m *ProfileManager) initReportChannel() { + // Original channel for receiving raw data chunks sent by the Writer + rawCh := make(chan profileRawData, maxSendQueueSize) + m.rawCh = rawCh + var d []byte + // Start a goroutine to supplement each data chunk with business information + go func() { + for rawResult := range rawCh { + d = append(d, rawResult.data...) + m.mu.Lock() + // Get business information from currentTask + task := m.currentTask + m.mu.Unlock() + + if task == nil { + m.Log.Info("no task") + continue // Task has ended, ignore + } + + if rawResult.isLast { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + m.mu.Lock() + m.TraceProfileTasks[m.currentTask.taskID].Status = reporter.Finished + m.currentTask = nil + m.profileEvents.BaseEventStatus[CurTaskExist] = false + m.mu.Unlock() + } else { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + } + } + }() +} + +func NewProfileManager(log operator.LogOperator) *ProfileManager { + pm := &ProfileManager{ + TraceProfileTasks: make(map[string]*reporter.TraceProfileTask), + FinalReportResults: make(chan reporter.ProfileResult, maxSendQueueSize), + profileEvents: NewEventManager(), + } + pm.RegisterProfileEvents() + + if log == nil { + log = newDefaultLogger() + } + pm.Log = log + pm.initReportChannel() + pm.profilingWriter = NewProfilingWriter( + ChunkSize, + pm.rawCh, + ) + return pm +} + +func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) { + m.mu.Lock() + defer m.mu.Unlock() + var task reporter.TraceProfileTask + for _, arg := range args { + switch arg.Key { + case "TaskID": + task.TaskID = arg.Value + case "EndpointName": + task.EndpointName = arg.Value + case "Duration": + // Duration min + task.Duration = parseInt(arg.Value) + case "MinDurationThreshold": + task.MinDurationThreshold = parseInt64(arg.Value) + case "DumpPeriod": + task.DumpPeriod = parseInt(arg.Value) + case "MaxSamplingCount": + task.MaxSamplingCount = parseInt(arg.Value) + case "StartTime": + task.StartTime = parseInt64(arg.Value) + case "CreateTime": + task.CreateTime = parseInt64(arg.Value) + case "SerialNumber": + task.SerialNumber = arg.Value + } + } + m.Log.Info("adding profile task:", task) + if _, exists := m.TraceProfileTasks[task.TaskID]; exists { + return + } + endTime := task.StartTime + int64(task.Duration)*60*1000 Review Comment: Please using `time.Time#Add` to add duration. ########## tools/go-agent/instrument/goroutine/instrument.go: ########## @@ -0,0 +1,134 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package goroutine + +import ( + "os" + "path/filepath" + runtimepkg "runtime" + "strings" + + "github.com/dave/dst" + "github.com/dave/dst/dstutil" + + "github.com/apache/skywalking-go/tools/go-agent/instrument/api" + "github.com/apache/skywalking-go/tools/go-agent/tools" +) + +const mainPackage = "main" + +type Instrument struct { + opts *api.CompileOptions + helperInjected bool +} + +func NewInstrument() *Instrument { return &Instrument{} } + +// CouldHandle checks whether the given package should be instrumented. +// Returns false if: +// - The package is not "main" and does not contain a dot (standard Go package). +// - The package is part of the SkyWalking-Go project itself. +func (i *Instrument) CouldHandle(opts *api.CompileOptions) bool { + i.opts = opts + if opts.Package != mainPackage && !strings.Contains(opts.Package, ".") { + return false + } + if strings.HasPrefix(opts.Package, "github.com/apache/skywalking-go/") { + return false + } + return true +} + +func (i *Instrument) FilterAndEdit(path string, curFile *dst.File, cursor *dstutil.Cursor, allFiles []*dst.File) bool { + // skip stdlib/module cache + if i.opts != nil { Review Comment: Looks like you want to enhance a lot of packages? why you needs to do this? ########## plugins/core/reporter/api.go: ########## @@ -108,11 +108,21 @@ var ( ConnectionStatusShutdown ConnectionStatus = 3 ) +type TaskStatus int + +const ( + Pending TaskStatus = iota Review Comment: Does this status only work for the trace profiling task? Please add the prefix of the const. In case it conflicts with other concepts. ########## plugins/core/context.go: ########## @@ -55,6 +56,27 @@ func (t *TracingContext) TakeSnapShot() interface{} { } } +func (t *TracingContext) SetPprofLabels() { Review Comment: Should the label setting only be invoked in the entry span checker? Then the labels should be propagated automatically when `new_proc1` is invoked. Please read this code from GO: https://github.com/golang/go/blob/master/src/runtime/proc.go#L5209 ########## plugins/core/profile.go: ########## @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package core + +import ( + "runtime/pprof" + "strconv" + "sync" + "sync/atomic" + "time" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/reporter" + common "github.com/apache/skywalking-go/protocols/collect/common/v3" +) + +type profileLabels struct { + labels *LabelSet +} + +const ( + maxSendQueueSize int32 = 100 + ChunkSize = 1024 * 1024 + TraceLabel = "traceID" + SegmentLabel = "traceSegmentID" + MinDurationLabel = "minDurationThreshold" + SpanLabel = "spanID" +) + +type currentTask struct { + serialNumber string // uuid + taskID string + minDurationThreshold int64 + endpointName string + duration int +} + +type ProfileManager struct { + mu sync.Mutex + TraceProfileTasks map[string]*reporter.TraceProfileTask + rawCh chan profileRawData + FinalReportResults chan reporter.ProfileResult + profilingWriter *ProfilingWriter + profileEvents *TraceProfilingEventManager + currentTask *currentTask + Log operator.LogOperator + counter atomic.Int32 +} + +func (m *ProfileManager) initReportChannel() { + // Original channel for receiving raw data chunks sent by the Writer + rawCh := make(chan profileRawData, maxSendQueueSize) + m.rawCh = rawCh + var d []byte + // Start a goroutine to supplement each data chunk with business information + go func() { + for rawResult := range rawCh { + d = append(d, rawResult.data...) + m.mu.Lock() + // Get business information from currentTask + task := m.currentTask + m.mu.Unlock() + + if task == nil { + m.Log.Info("no task") + continue // Task has ended, ignore + } + + if rawResult.isLast { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + m.mu.Lock() + m.TraceProfileTasks[m.currentTask.taskID].Status = reporter.Finished + m.currentTask = nil + m.profileEvents.BaseEventStatus[CurTaskExist] = false + m.mu.Unlock() + } else { + m.FinalReportResults <- reporter.ProfileResult{ + TaskID: task.taskID, + Payload: rawResult.data, + IsLast: rawResult.isLast, + } + } + } + }() +} + +func NewProfileManager(log operator.LogOperator) *ProfileManager { + pm := &ProfileManager{ + TraceProfileTasks: make(map[string]*reporter.TraceProfileTask), + FinalReportResults: make(chan reporter.ProfileResult, maxSendQueueSize), + profileEvents: NewEventManager(), + } + pm.RegisterProfileEvents() + + if log == nil { + log = newDefaultLogger() + } + pm.Log = log + pm.initReportChannel() + pm.profilingWriter = NewProfilingWriter( + ChunkSize, + pm.rawCh, + ) + return pm +} + +func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) { + m.mu.Lock() + defer m.mu.Unlock() + var task reporter.TraceProfileTask + for _, arg := range args { + switch arg.Key { + case "TaskID": + task.TaskID = arg.Value + case "EndpointName": + task.EndpointName = arg.Value + case "Duration": + // Duration min + task.Duration = parseInt(arg.Value) + case "MinDurationThreshold": + task.MinDurationThreshold = parseInt64(arg.Value) + case "DumpPeriod": + task.DumpPeriod = parseInt(arg.Value) + case "MaxSamplingCount": + task.MaxSamplingCount = parseInt(arg.Value) + case "StartTime": + task.StartTime = parseInt64(arg.Value) + case "CreateTime": + task.CreateTime = parseInt64(arg.Value) + case "SerialNumber": + task.SerialNumber = arg.Value + } + } + m.Log.Info("adding profile task:", task) + if _, exists := m.TraceProfileTasks[task.TaskID]; exists { + return + } + endTime := task.StartTime + int64(task.Duration)*60*1000 + task.EndTime = endTime + task.Status = reporter.Pending + m.TraceProfileTasks[task.TaskID] = &task + m.TrySetCurrentTask(&task) + m.tryStartCPUProfiling() +} + +func (m *ProfileManager) RemoveProfileTask() { + m.mu.Lock() + defer m.mu.Unlock() + for k, t := range m.TraceProfileTasks { + if t.Status == reporter.Reported || t.EndTime < time.Now().Unix() { + delete(m.TraceProfileTasks, k) + } + } +} + +func (m *ProfileManager) tryStartCPUProfiling() { + ok, err := m.profileEvents.ExecuteComplexEvent(CouldProfile) + if err != nil { + m.Log.Errorf("profile event error:%v", err) + return + } + t := m.TraceProfileTasks[m.currentTask.taskID] + if ok && t.Status == reporter.Pending { + err := pprof.StartCPUProfile(m.profilingWriter) + if err != nil { + m.Log.Info("failed to start cpu profiling", err) + return + } + err = m.profileEvents.UpdateBaseEventStatus(IfProfiling, true) + if err != nil { + m.Log.Errorf("update profile event error:%v", err) + } + t.Status = reporter.Running + go m.monitor() + } +} + +func (m *ProfileManager) CheckIfProfileTarget(endpoint string) bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.currentTask == nil { + return false + } + return m.currentTask.endpointName == endpoint +} + +func (m *ProfileManager) IfProfiling() bool { + return m.profileEvents.BaseEventStatus[IfProfiling] +} + +func (m *ProfileManager) TrySetCurrentTask(task *reporter.TraceProfileTask) { + ok, err := m.profileEvents.ExecuteComplexEvent(CouldSetCurTask) + if err != nil { + m.Log.Errorf("profile event error:%v", err) + } + if ok { + m.generateCurrentTask(task) + } +} + +func (m *ProfileManager) generateProfileLabels(traceSegmentID string, minDurationThreshold int64) profileLabels { + var l = LabelSet{} + l = UpdateTraceLabels(l, SegmentLabel, traceSegmentID, MinDurationLabel, strconv.FormatInt(minDurationThreshold, 10)) + return profileLabels{ + labels: &l, + } +} + +func (m *ProfileManager) generateCurrentTask(t *reporter.TraceProfileTask) { + var c = currentTask{ + serialNumber: t.SerialNumber, + taskID: t.TaskID, + minDurationThreshold: t.MinDurationThreshold, + duration: t.Duration, + endpointName: t.EndpointName, + } + m.currentTask = &c + err := m.profileEvents.UpdateBaseEventStatus(CurTaskExist, true) + if err != nil { + m.Log.Errorf("profile event error:%v", err) + } +} + +func (m *ProfileManager) TryToAddSegmentLabelSet(traceSegmentID string) { + if m.currentTask != nil { + c := m.generateProfileLabels(traceSegmentID, m.currentTask.minDurationThreshold) + SetGoroutineLabels(c.labels) + return + } +} + +func (m *ProfileManager) monitor() { + done := make(chan struct{}) + var zeroSince time.Time + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + go func() { + for range ticker.C { Review Comment: If the task has started, and after one second, the endpoint does not trigger any request. The agent will stop the profiling without reaching the end time? This logic may not be correct. Also, if the end time is not reached, and you have stopped the profiling, at this time, the agent receives the request that matches the endpoint. Will it restart the profiling? So I think for the simplify, you could just stop the profiling when the end time reached. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
