mrproliu commented on code in PR #229:
URL: https://github.com/apache/skywalking-go/pull/229#discussion_r2315243430
##########
go.work:
##########
@@ -2,78 +2,79 @@ go 1.19
use (
.
+ ./plugins/amqp
Review Comment:
Please keep the original format. Otherwise, it's not easy to know which
projects have been added.
##########
plugins/core/prof_labels.go:
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "context"
+ "runtime/pprof"
+ "sort"
+ "strings"
+ "unsafe"
+)
+
+type label struct {
+ key string
+ value string
+}
+
+type LabelSet struct {
+ list []label
+}
+
+type labelMap struct {
+ LabelSet
+}
+
+//go:linkname runtimeGetProfLabel runtime/pprof.runtime_getProfLabel
+func runtimeGetProfLabel() unsafe.Pointer
+
+//go:linkname runtimeSetProfLabel runtime/pprof.runtime_setProfLabel
+func runtimeSetProfLabel(label unsafe.Pointer)
+
+func (m *ProfileManager) GetPprofLabelSet() interface{} {
+ ptr := runtimeGetProfLabel()
+ if ptr != nil {
+ lm := (*labelMap)(ptr)
+ if lm != nil && lm.list != nil {
+ return &lm.LabelSet
+ } else {
+ return &LabelSet{list: make([]label, 0)}
+ }
+ } else {
+ return &LabelSet{list: make([]label, 0)}
+ }
+}
+
+func (m *ProfileManager) TurnToPprofLabel(l interface{}) interface{} {
+ li := l.(*LabelSet).List()
+ re := pprof.Labels(li...)
+ return re
+}
+
+func GetLabelsFromCtx(ctx context.Context) LabelSet {
Review Comment:
When this method should be used?
##########
plugins/core/go.mod:
##########
@@ -2,29 +2,30 @@ module github.com/apache/skywalking-go/plugins/core
go 1.19
+
require (
github.com/dave/dst v0.27.2
- github.com/google/uuid v1.3.0
Review Comment:
Please keep the original format. Also, is only
`skywalking.apache.org/repo/goapi` dependency update?
##########
plugins/core/tracing.go:
##########
@@ -64,6 +68,16 @@ func (t *Tracer) CreateEntrySpan(operationName string,
extractor interface{}, op
}
span, _, err := t.createSpan0(ctx, tracingSpan, opts, withRef(ref),
withSpanType(SpanTypeEntry), withOperationName(operationName))
+ if err == nil {
+ id := span.GetSegmentID()
+ t.ProfileManager.ToProfile(operationName, id)
Review Comment:
Does this mean all the endpoints will be profiling, without any endpoint
name checking?
##########
plugins/core/profile.go:
##########
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "fmt"
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+ "runtime/pprof"
+ common "skywalking.apache.org/repo/goapi/collect/common/v3"
+ "strconv"
+ "sync"
+ "time"
+)
+
+type profileLabels struct {
+ labels *LabelSet
+ closeChan chan struct{}
+}
+
+const (
+ maxSendQueueSize int32 = 100
+ ChunkSize = 1024 * 1024
+ SegmentLabel = "traceSegmentID"
+ MinDurationLabel = "minDurationThreshold"
+ SpanLabel = "spanID"
+)
+
+type currentTask struct {
+ serialNumber string // uuid
+ taskId string
+ traceSegmentId string
+ minDurationThreshold int64
+ duration int
+}
+
+type ProfileManager struct {
+ mu sync.Mutex
+ labelSets map[string]profileLabels
+ status bool
+ TraceProfileTasks map[string]*reporter.TraceProfileTask
+ rawCh chan profileRawData
+ FinalReportResults chan reporter.ProfileResult
+ profilingWriter *ProfilingWriter
+ currentTask *currentTask
+}
+
+func (m *ProfileManager) initReportChannel() {
+ // Original channel for receiving raw data chunks sent by the Writer
+ rawCh := make(chan profileRawData, maxSendQueueSize)
+ m.rawCh = rawCh
+
+ // Start a goroutine to supplement each data chunk with business
information
+ go func() {
+ for rawResult := range rawCh {
+ m.mu.Lock()
+ // Get business information from currentTask
+ task := m.currentTask
+ m.mu.Unlock()
+
+ if task == nil {
+ fmt.Println("no task")
+ continue // Task has ended, ignore
+ }
+
+ if rawResult.isLast {
+ m.FinalReportResults <- reporter.ProfileResult{
+ TaskID: task.taskId,
+ TraceSegmentID: task.traceSegmentId,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
+ } else {
+ m.FinalReportResults <- reporter.ProfileResult{
+ TaskID: task.taskId,
+ TraceSegmentID: task.traceSegmentId,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
+ }
+
+ }
+ }()
+}
+
+func NewProfileManager() *ProfileManager {
+ pm := &ProfileManager{
+ TraceProfileTasks: make(map[string]*reporter.TraceProfileTask),
+ FinalReportResults: make(chan reporter.ProfileResult,
maxSendQueueSize),
+ status: false,
+ labelSets: make(map[string]profileLabels),
+ }
+ pm.initReportChannel()
+ return pm
+}
+
+func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ var task reporter.TraceProfileTask
+ for _, arg := range args {
+ switch arg.Key {
+ case "TaskId":
+ task.TaskId = arg.Value
+ case "EndpointName":
+ task.EndpointName = arg.Value
+ case "Duration":
+ // Duration min
+ task.Duration = parseInt(arg.Value)
+ case "MinDurationThreshold":
+ task.MinDurationThreshold = parseInt64(arg.Value)
+ case "DumpPeriod":
+ task.DumpPeriod = parseInt(arg.Value)
+ case "MaxSamplingCount":
+ task.MaxSamplingCount = parseInt(arg.Value)
+ case "StartTime":
+ task.StartTime = parseInt64(arg.Value)
+ case "CreateTime":
+ task.CreateTime = parseInt64(arg.Value)
+ case "SerialNumber":
+ task.SerialNumber = arg.Value
+ }
+ }
+ fmt.Println("adding profile task:", task)
+ if _, exists := m.TraceProfileTasks[task.TaskId]; exists {
+ return
+ }
+ endTime := task.StartTime + int64(task.Duration)*60*1000
+ task.EndTime = endTime
+ task.Status = reporter.Pending
+ m.TraceProfileTasks[task.TaskId] = &task
+}
+
+func (m *ProfileManager) RemoveProfileTask() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ for k, t := range m.TraceProfileTasks {
+ if t.Status == reporter.Reported || t.EndTime <
time.Now().Unix() {
+ delete(m.TraceProfileTasks, k)
+ }
+ }
+}
+
+func (m *ProfileManager) getProfileTask(endpoint string)
[]*reporter.TraceProfileTask {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ var result []*reporter.TraceProfileTask
+ for _, t := range m.TraceProfileTasks {
+ endTime := t.StartTime + int64(t.Duration)*60*1000
+ if t.EndpointName == endpoint && t.StartTime <=
time.Now().UnixMilli() && endTime > time.Now().UnixMilli() && t.Status ==
reporter.Pending {
+ result = append(result, t)
+ }
+ }
+ return result
+}
+
+func (m *ProfileManager) IfProfiling() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.status
+}
+
+func (m *ProfileManager) generateProfileLabels(traceSegmentID string,
minDurationThreshold int64) profileLabels {
+ var l = &LabelSet{}
+ if minDurationThreshold == 0 {
+ l = Labels(l, SegmentLabel, traceSegmentID)
+ } else {
+ l = Labels(l, SegmentLabel, traceSegmentID, MinDurationLabel,
strconv.FormatInt(minDurationThreshold, 10))
+ }
+ closeChan := make(chan struct{}, 1)
+ return profileLabels{
+ labels: l,
+ closeChan: closeChan,
+ }
+}
+
+func (m *ProfileManager) generateCurrentTask(t *reporter.TraceProfileTask,
traceSegmentID string) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ var c = currentTask{
+ serialNumber: t.SerialNumber,
+ taskId: t.TaskId,
+ traceSegmentId: traceSegmentID,
+ minDurationThreshold: t.MinDurationThreshold,
+ duration: t.Duration,
+ }
+ m.currentTask = &c
+}
+
+func (m *ProfileManager) ToProfile(endpoint string, traceSegmentID string) {
+ //check if profiling
+ if m.IfProfiling() {
+ c := m.generateProfileLabels(traceSegmentID, 0)
+ m.labelSets[traceSegmentID] = c
+ SetGoroutineLabels(c.labels)
+ return
+ }
+
+ tasks := m.getProfileTask(endpoint)
+ if tasks != nil {
+ for _, v := range tasks {
+ m.TraceProfileTasks[v.TaskId].Status = reporter.Running
+ //choose task to profiling
+ task := v
+ m.generateCurrentTask(task, traceSegmentID)
+ err := m.StartProfiling(traceSegmentID)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ go func(task *reporter.TraceProfileTask) {
+ err = m.monitor()
+ if err != nil {
+ m.TraceProfileTasks[task.TaskId].Status
= reporter.Pending
+ m.currentTask = nil
+ m.status = false
+ return
+ }
+ m.TraceProfileTasks[task.TaskId].Status =
reporter.Finished
+ }(task)
+ break
+ }
+ }
+
+}
+
+func (m *ProfileManager) StartProfiling(traceSegmentID string) error {
Review Comment:
CPU Profiling should be a global work, not related to a specific trace.
Because the endpoint will be executing.
Such as there is one endpoint that receives two identical requests at the
same time, `pprof.StartCPUProfile` will be executed twice.
##########
plugins/core/reporter/grpc/grpc.go:
##########
@@ -279,6 +287,51 @@ func (r *gRPCReporter) initSendPipeline() {
break
}
}()
+ go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ r.logger.Errorf("gRPCReporter
reportProfileResult panic err %v", err)
+ }
+ }()
+
+ StreamLoop:
+ for {
+ switch r.connManager.GetConnectionStatus(r.serverAddr) {
+ case reporter.ConnectionStatusShutdown:
+ break
+ case reporter.ConnectionStatusDisconnect:
+ time.Sleep(5 * time.Second)
+ continue StreamLoop
+ }
+
+ stream, err :=
r.profileTaskClient.GoProfileReport(metadata.NewOutgoingContext(context.Background(),
r.connManager.GetMD()))
+ if err != nil {
+ r.logger.Errorf("open profile stream error %v",
err)
+ time.Sleep(5 * time.Second)
+ continue StreamLoop
+ }
+ re := r.profileTaskManager.GetProfileResults()
Review Comment:
Could the agent check the result length first? If there is no profiling
result, it will continue to create new streams and close them, wasting a lot of
CPU resources, and there is no wait strategy.
##########
plugins/core/prof_labels.go:
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "context"
+ "runtime/pprof"
+ "sort"
+ "strings"
+ "unsafe"
+)
+
+type label struct {
+ key string
+ value string
+}
+
+type LabelSet struct {
+ list []label
+}
+
+type labelMap struct {
+ LabelSet
+}
+
+//go:linkname runtimeGetProfLabel runtime/pprof.runtime_getProfLabel
+func runtimeGetProfLabel() unsafe.Pointer
+
+//go:linkname runtimeSetProfLabel runtime/pprof.runtime_setProfLabel
+func runtimeSetProfLabel(label unsafe.Pointer)
+
+func (m *ProfileManager) GetPprofLabelSet() interface{} {
+ ptr := runtimeGetProfLabel()
+ if ptr != nil {
+ lm := (*labelMap)(ptr)
+ if lm != nil && lm.list != nil {
+ return &lm.LabelSet
+ } else {
+ return &LabelSet{list: make([]label, 0)}
+ }
+ } else {
+ return &LabelSet{list: make([]label, 0)}
+ }
+}
+
+func (m *ProfileManager) TurnToPprofLabel(l interface{}) interface{} {
+ li := l.(*LabelSet).List()
+ re := pprof.Labels(li...)
+ return re
+}
+
+func GetLabelsFromCtx(ctx context.Context) LabelSet {
+ var labels LabelSet
+
+ pprof.ForLabels(ctx, func(key, value string) bool {
+ labels.list = append(labels.list, label{key: key, value: value})
+ return true
+ })
+ return labels
+}
+
+func GetPprofLabelSet() *LabelSet {
Review Comment:
Is this method duplicated?
##########
plugins/core/profile.go:
##########
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "fmt"
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+ "runtime/pprof"
+ common "skywalking.apache.org/repo/goapi/collect/common/v3"
+ "strconv"
+ "sync"
+ "time"
+)
+
+type profileLabels struct {
+ labels *LabelSet
+ closeChan chan struct{}
+}
+
+const (
+ maxSendQueueSize int32 = 100
+ ChunkSize = 1024 * 1024
+ SegmentLabel = "traceSegmentID"
+ MinDurationLabel = "minDurationThreshold"
+ SpanLabel = "spanID"
+)
+
+type currentTask struct {
+ serialNumber string // uuid
+ taskId string
+ traceSegmentId string
+ minDurationThreshold int64
+ duration int
+}
+
+type ProfileManager struct {
+ mu sync.Mutex
+ labelSets map[string]profileLabels
+ status bool
+ TraceProfileTasks map[string]*reporter.TraceProfileTask
+ rawCh chan profileRawData
+ FinalReportResults chan reporter.ProfileResult
+ profilingWriter *ProfilingWriter
+ currentTask *currentTask
+}
+
+func (m *ProfileManager) initReportChannel() {
+ // Original channel for receiving raw data chunks sent by the Writer
+ rawCh := make(chan profileRawData, maxSendQueueSize)
+ m.rawCh = rawCh
+
+ // Start a goroutine to supplement each data chunk with business
information
+ go func() {
+ for rawResult := range rawCh {
+ m.mu.Lock()
+ // Get business information from currentTask
+ task := m.currentTask
+ m.mu.Unlock()
+
+ if task == nil {
+ fmt.Println("no task")
+ continue // Task has ended, ignore
+ }
+
+ if rawResult.isLast {
+ m.FinalReportResults <- reporter.ProfileResult{
+ TaskID: task.taskId,
+ TraceSegmentID: task.traceSegmentId,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
+ } else {
+ m.FinalReportResults <- reporter.ProfileResult{
+ TaskID: task.taskId,
+ TraceSegmentID: task.traceSegmentId,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
+ }
+
+ }
+ }()
+}
+
+func NewProfileManager() *ProfileManager {
+ pm := &ProfileManager{
+ TraceProfileTasks: make(map[string]*reporter.TraceProfileTask),
+ FinalReportResults: make(chan reporter.ProfileResult,
maxSendQueueSize),
+ status: false,
+ labelSets: make(map[string]profileLabels),
+ }
+ pm.initReportChannel()
+ return pm
+}
+
+func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ var task reporter.TraceProfileTask
+ for _, arg := range args {
+ switch arg.Key {
+ case "TaskId":
+ task.TaskId = arg.Value
+ case "EndpointName":
+ task.EndpointName = arg.Value
+ case "Duration":
+ // Duration min
+ task.Duration = parseInt(arg.Value)
+ case "MinDurationThreshold":
+ task.MinDurationThreshold = parseInt64(arg.Value)
+ case "DumpPeriod":
+ task.DumpPeriod = parseInt(arg.Value)
+ case "MaxSamplingCount":
+ task.MaxSamplingCount = parseInt(arg.Value)
+ case "StartTime":
+ task.StartTime = parseInt64(arg.Value)
+ case "CreateTime":
+ task.CreateTime = parseInt64(arg.Value)
+ case "SerialNumber":
+ task.SerialNumber = arg.Value
+ }
+ }
+ fmt.Println("adding profile task:", task)
+ if _, exists := m.TraceProfileTasks[task.TaskId]; exists {
+ return
+ }
+ endTime := task.StartTime + int64(task.Duration)*60*1000
+ task.EndTime = endTime
+ task.Status = reporter.Pending
+ m.TraceProfileTasks[task.TaskId] = &task
+}
+
+func (m *ProfileManager) RemoveProfileTask() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ for k, t := range m.TraceProfileTasks {
+ if t.Status == reporter.Reported || t.EndTime <
time.Now().Unix() {
+ delete(m.TraceProfileTasks, k)
+ }
+ }
+}
+
+func (m *ProfileManager) getProfileTask(endpoint string)
[]*reporter.TraceProfileTask {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ var result []*reporter.TraceProfileTask
+ for _, t := range m.TraceProfileTasks {
+ endTime := t.StartTime + int64(t.Duration)*60*1000
+ if t.EndpointName == endpoint && t.StartTime <=
time.Now().UnixMilli() && endTime > time.Now().UnixMilli() && t.Status ==
reporter.Pending {
+ result = append(result, t)
+ }
+ }
+ return result
+}
+
+func (m *ProfileManager) IfProfiling() bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return m.status
+}
+
+func (m *ProfileManager) generateProfileLabels(traceSegmentID string,
minDurationThreshold int64) profileLabels {
+ var l = &LabelSet{}
+ if minDurationThreshold == 0 {
+ l = Labels(l, SegmentLabel, traceSegmentID)
+ } else {
+ l = Labels(l, SegmentLabel, traceSegmentID, MinDurationLabel,
strconv.FormatInt(minDurationThreshold, 10))
+ }
+ closeChan := make(chan struct{}, 1)
+ return profileLabels{
+ labels: l,
+ closeChan: closeChan,
+ }
+}
+
+func (m *ProfileManager) generateCurrentTask(t *reporter.TraceProfileTask,
traceSegmentID string) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ var c = currentTask{
+ serialNumber: t.SerialNumber,
+ taskId: t.TaskId,
+ traceSegmentId: traceSegmentID,
+ minDurationThreshold: t.MinDurationThreshold,
+ duration: t.Duration,
+ }
+ m.currentTask = &c
+}
+
+func (m *ProfileManager) ToProfile(endpoint string, traceSegmentID string) {
+ //check if profiling
+ if m.IfProfiling() {
+ c := m.generateProfileLabels(traceSegmentID, 0)
+ m.labelSets[traceSegmentID] = c
+ SetGoroutineLabels(c.labels)
+ return
+ }
+
+ tasks := m.getProfileTask(endpoint)
+ if tasks != nil {
+ for _, v := range tasks {
+ m.TraceProfileTasks[v.TaskId].Status = reporter.Running
+ //choose task to profiling
+ task := v
+ m.generateCurrentTask(task, traceSegmentID)
+ err := m.StartProfiling(traceSegmentID)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ go func(task *reporter.TraceProfileTask) {
+ err = m.monitor()
+ if err != nil {
+ m.TraceProfileTasks[task.TaskId].Status
= reporter.Pending
+ m.currentTask = nil
+ m.status = false
+ return
+ }
+ m.TraceProfileTasks[task.TaskId].Status =
reporter.Finished
+ }(task)
+ break
+ }
+ }
+
+}
+
+func (m *ProfileManager) StartProfiling(traceSegmentID string) error {
+ m.mu.Lock()
+ m.status = true
+ m.profilingWriter = NewProfilingWriter(
+ ChunkSize,
+ m.rawCh,
+ )
+ // Add main profiling context
+ c := m.generateProfileLabels(traceSegmentID,
m.currentTask.minDurationThreshold)
+ SetGoroutineLabels(c.labels)
+ m.labelSets[traceSegmentID] = c
+ m.mu.Unlock()
+
+ if err := pprof.StartCPUProfile(m.profilingWriter); err != nil {
+ m.mu.Lock()
+ m.status = false
+ m.mu.Unlock()
+ return err
+ }
+ return nil
+}
+
+func (m *ProfileManager) monitor() error {
+ select {
+ // End on timeout
+ case <-time.After(time.Duration(m.currentTask.duration) * time.Minute):
+
+ // End manually
+ case <-m.labelSets[m.currentTask.traceSegmentId].closeChan:
+ }
+ // Stop profiling
+ pprof.StopCPUProfile()
Review Comment:
Same here, if there are two requests received, you stopped the CPU profiling
when the first endpoint executed, and the second request is not finished. Then,
the second request trace profiling is not completed.
##########
plugins/core/profile.go:
##########
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "fmt"
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+ "runtime/pprof"
+ common "skywalking.apache.org/repo/goapi/collect/common/v3"
+ "strconv"
+ "sync"
+ "time"
+)
+
+type profileLabels struct {
+ labels *LabelSet
+ closeChan chan struct{}
+}
+
+const (
+ maxSendQueueSize int32 = 100
+ ChunkSize = 1024 * 1024
+ SegmentLabel = "traceSegmentID"
+ MinDurationLabel = "minDurationThreshold"
+ SpanLabel = "spanID"
+)
+
+type currentTask struct {
+ serialNumber string // uuid
+ taskId string
+ traceSegmentId string
+ minDurationThreshold int64
+ duration int
+}
+
+type ProfileManager struct {
+ mu sync.Mutex
+ labelSets map[string]profileLabels
+ status bool
+ TraceProfileTasks map[string]*reporter.TraceProfileTask
+ rawCh chan profileRawData
+ FinalReportResults chan reporter.ProfileResult
+ profilingWriter *ProfilingWriter
+ currentTask *currentTask
+}
+
+func (m *ProfileManager) initReportChannel() {
+ // Original channel for receiving raw data chunks sent by the Writer
+ rawCh := make(chan profileRawData, maxSendQueueSize)
+ m.rawCh = rawCh
+
+ // Start a goroutine to supplement each data chunk with business
information
+ go func() {
+ for rawResult := range rawCh {
+ m.mu.Lock()
+ // Get business information from currentTask
+ task := m.currentTask
+ m.mu.Unlock()
+
+ if task == nil {
+ fmt.Println("no task")
+ continue // Task has ended, ignore
+ }
+
+ if rawResult.isLast {
+ m.FinalReportResults <- reporter.ProfileResult{
+ TaskID: task.taskId,
+ TraceSegmentID: task.traceSegmentId,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
+ } else {
+ m.FinalReportResults <- reporter.ProfileResult{
+ TaskID: task.taskId,
+ TraceSegmentID: task.traceSegmentId,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
+ }
+
+ }
+ }()
+}
+
+func NewProfileManager() *ProfileManager {
+ pm := &ProfileManager{
+ TraceProfileTasks: make(map[string]*reporter.TraceProfileTask),
+ FinalReportResults: make(chan reporter.ProfileResult,
maxSendQueueSize),
+ status: false,
+ labelSets: make(map[string]profileLabels),
+ }
+ pm.initReportChannel()
+ return pm
+}
+
+func (m *ProfileManager) AddProfileTask(args []*common.KeyStringValuePair) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ var task reporter.TraceProfileTask
+ for _, arg := range args {
+ switch arg.Key {
+ case "TaskId":
+ task.TaskId = arg.Value
+ case "EndpointName":
+ task.EndpointName = arg.Value
+ case "Duration":
+ // Duration min
+ task.Duration = parseInt(arg.Value)
+ case "MinDurationThreshold":
+ task.MinDurationThreshold = parseInt64(arg.Value)
+ case "DumpPeriod":
+ task.DumpPeriod = parseInt(arg.Value)
+ case "MaxSamplingCount":
+ task.MaxSamplingCount = parseInt(arg.Value)
+ case "StartTime":
+ task.StartTime = parseInt64(arg.Value)
+ case "CreateTime":
+ task.CreateTime = parseInt64(arg.Value)
+ case "SerialNumber":
+ task.SerialNumber = arg.Value
+ }
+ }
+ fmt.Println("adding profile task:", task)
Review Comment:
Please remove all the `fmt.Println` method, using logger to do this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]