This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-go.git
The following commit(s) were added to refs/heads/main by this push: new 37820ce feature: add support for Pulsar (#173) 37820ce is described below commit 37820cedda3764d9049e3892d7cbf39876f3fce0 Author: Starry <codeprince2...@163.com> AuthorDate: Mon Mar 11 11:17:18 2024 +0800 feature: add support for Pulsar (#173) --- .github/workflows/plugin-tests.yaml | 1 + CHANGES.md | 6 +- docs/en/agent/support-plugins.md | 1 + go.work | 2 + plugins/pulsar/go.mod | 5 + plugins/pulsar/go.sum | 2 + plugins/pulsar/instrument.go | 90 ++++++++++++++ plugins/pulsar/pulsar/receive_consumer.go | 70 +++++++++++ plugins/pulsar/pulsar/send_async_producer.go | 105 ++++++++++++++++ plugins/pulsar/pulsar/send_producer.go | 83 +++++++++++++ plugins/pulsar/pulsar/structures.go | 69 +++++++++++ test/plugins/scenarios/pulsar/bin/startup.sh | 22 ++++ test/plugins/scenarios/pulsar/config/excepted.yml | 121 +++++++++++++++++++ test/plugins/scenarios/pulsar/go.mod | 7 ++ test/plugins/scenarios/pulsar/go.sum | 2 + test/plugins/scenarios/pulsar/main.go | 141 ++++++++++++++++++++++ test/plugins/scenarios/pulsar/plugin.yml | 31 +++++ tools/go-agent/instrument/plugins/register.go | 2 + 18 files changed, 759 insertions(+), 1 deletion(-) diff --git a/.github/workflows/plugin-tests.yaml b/.github/workflows/plugin-tests.yaml index feeb7fa..737184d 100644 --- a/.github/workflows/plugin-tests.yaml +++ b/.github/workflows/plugin-tests.yaml @@ -98,6 +98,7 @@ jobs: - echov4 - rocketmq - amqp + - pulsar steps: - uses: actions/checkout@v2 with: diff --git a/CHANGES.md b/CHANGES.md index 08df91e..467c25f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,11 @@ Changes by Version ================== Release Notes. +0.5.0 +------------------ +#### Plugins +* Support [Pulsar](https://github.com/apache/pulsar-client-go) MQ. + 0.4.0 ------------------ #### Features @@ -92,7 +97,6 @@ Release Notes. 0.1.0 ------------------ - #### Features * Initialize the agent core and user import library. * Support gRPC reporter for management, tracing protocols. diff --git a/docs/en/agent/support-plugins.md b/docs/en/agent/support-plugins.md index b0a455a..92502f1 100644 --- a/docs/en/agent/support-plugins.md +++ b/docs/en/agent/support-plugins.md @@ -30,6 +30,7 @@ metrics based on the tracing data. * MQ Client * `rocketMQ`: [rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested v2.1.2. * `amqp`: [AMQP](https://github.com/rabbitmq/amqp091-go) tested v1.9.0. + * `pulsar`: [pulsar-client-go](https://github.com/apache/pulsar-client-go) tested v0.12.0. # Metrics Plugins The meter plugin provides the advanced metrics collections. diff --git a/go.work b/go.work index 08b1e93..fd0ce8b 100644 --- a/go.work +++ b/go.work @@ -25,6 +25,7 @@ use ( ./plugins/echov4 ./plugins/rocketmq ./plugins/amqp + ./plugins/pulsar ./test/benchmark-codebase/consumer ./test/benchmark-codebase/provider @@ -56,6 +57,7 @@ use ( ./test/plugins/scenarios/echov4 ./test/plugins/scenarios/rocketmq ./test/plugins/scenarios/amqp + ./test/plugins/scenarios/pulsar ./tools/go-agent diff --git a/plugins/pulsar/go.mod b/plugins/pulsar/go.mod new file mode 100644 index 0000000..4e5628e --- /dev/null +++ b/plugins/pulsar/go.mod @@ -0,0 +1,5 @@ +module github.com/apache/skywalking-go/plugins/pulsar + +go 1.18 + +require github.com/apache/pulsar-client-go v0.12.0 // indirect diff --git a/plugins/pulsar/go.sum b/plugins/pulsar/go.sum new file mode 100644 index 0000000..0f5c7f4 --- /dev/null +++ b/plugins/pulsar/go.sum @@ -0,0 +1,2 @@ +github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I= +github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= diff --git a/plugins/pulsar/instrument.go b/plugins/pulsar/instrument.go new file mode 100644 index 0000000..53da703 --- /dev/null +++ b/plugins/pulsar/instrument.go @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "embed" + + "github.com/apache/skywalking-go/plugins/core/instrument" +) + +//go:embed * +var fs embed.FS + +//skywalking:nocopy +type Instrument struct { +} + +func NewInstrument() *Instrument { + return &Instrument{} +} + +func (i *Instrument) Name() string { + return "pulsar" +} + +func (i *Instrument) BasePackage() string { + return "github.com/apache/pulsar-client-go" +} + +func (i *Instrument) VersionChecker(version string) bool { + return true +} + +func (i *Instrument) Points() []*instrument.Point { + return []*instrument.Point{ + { + PackagePath: "pulsar", + At: instrument.NewMethodEnhance("*partitionProducer", "Send", + instrument.WithArgsCount(2), + instrument.WithArgType(0, "context.Context"), + instrument.WithArgType(1, "*ProducerMessage"), + instrument.WithResultCount(2), + instrument.WithResultType(0, "MessageID"), + instrument.WithResultType(1, "error"), + ), + Interceptor: "SendInterceptor", + }, + { + PackagePath: "pulsar", + At: instrument.NewMethodEnhance("*partitionProducer", "SendAsync", + instrument.WithArgsCount(3), + instrument.WithArgType(0, "context.Context"), + instrument.WithArgType(1, "*ProducerMessage"), + instrument.WithArgType(2, "func(MessageID, *ProducerMessage, error)"), + instrument.WithResultCount(0), + ), + Interceptor: "SendAsyncInterceptor", + }, + { + PackagePath: "pulsar", + At: instrument.NewMethodEnhance("*consumer", "Receive", + instrument.WithArgsCount(1), + instrument.WithArgType(0, "context.Context"), + instrument.WithResultCount(2), + instrument.WithResultType(0, "Message"), + instrument.WithResultType(1, "error"), + ), + Interceptor: "ReceiveInterceptor", + }, + } +} + +func (i *Instrument) FS() *embed.FS { + return &fs +} diff --git a/plugins/pulsar/pulsar/receive_consumer.go b/plugins/pulsar/pulsar/receive_consumer.go new file mode 100644 index 0000000..1e75467 --- /dev/null +++ b/plugins/pulsar/pulsar/receive_consumer.go @@ -0,0 +1,70 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "github.com/apache/pulsar-client-go/pulsar" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + pulsarReceivePrefix = "Pulsar/" + pulsarReceiveSuffix = "/Consumer" + pulsarReceiveComponentID = 74 +) + +type ReceiveInterceptor struct { +} + +func (r *ReceiveInterceptor) BeforeInvoke(invocation operator.Invocation) error { + return nil +} + +func (r *ReceiveInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + nativeConsumer := invocation.CallerInstance().(*nativeconsumer) + topic := nativeConsumer.options.Topic + lookup, err := nativeConsumer.client.lookupService.Lookup(topic) + if err != nil { + return err + } + message := result[0].(pulsar.Message) + peer := lookup.PhysicalAddr.String() + operationName := pulsarReceivePrefix + topic + pulsarReceiveSuffix + + span, err := tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) { + return message.Properties()[headerKey], nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(pulsarReceiveComponentID), + tracing.WithTag(tracing.TagMQBroker, lookup.PhysicalAddr.String()), + tracing.WithTag(tracing.TagMQTopic, nativeConsumer.topic), + ) + if err != nil { + return err + } + + if err, ok := result[1].(pulsar.Error); ok { + span.Tag(tracing.TagMQStatus, err.Error()) + span.Error(err.Error()) + } + span.SetPeer(peer) + span.End() + return nil +} diff --git a/plugins/pulsar/pulsar/send_async_producer.go b/plugins/pulsar/pulsar/send_async_producer.go new file mode 100644 index 0000000..cd91091 --- /dev/null +++ b/plugins/pulsar/pulsar/send_async_producer.go @@ -0,0 +1,105 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + pulsarAsyncPrefix = "Pulsar/" + pulsarAsyncSuffix = "/AsyncProducer" + pulsarCallbackSuffix = "/Producer/Callback" + pulsarAsyncComponentID = 73 +) + +type SendAsyncInterceptor struct { +} + +func (s *SendAsyncInterceptor) BeforeInvoke(invocation operator.Invocation) error { + nativeProducer := invocation.CallerInstance().(*nativepartitionProducer) + topic := nativeProducer.options.Topic + msg := invocation.Args()[1].(*ProducerMessage) + lookup, err := nativeProducer.client.lookupService.Lookup(topic) + if err != nil { + return err + } + peer := lookup.PhysicalAddr.String() + operationName := pulsarAsyncPrefix + topic + pulsarAsyncSuffix + + span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error { + if msg.Properties == nil { + msg.Properties = map[string]string{ + headerKey: headerValue, + } + return nil + } + msg.Properties[headerKey] = headerValue + return nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(pulsarAsyncComponentID), + tracing.WithTag(tracing.TagMQBroker, lookup.PhysicalAddr.String()), + tracing.WithTag(tracing.TagMQTopic, nativeProducer.topic), + ) + if err != nil { + return err + } + + continueSnapShot := tracing.CaptureContext() + zuper := invocation.Args()[2].(func(id MessageID, message *ProducerMessage, err error)) + + callbackFunc := func(id MessageID, message *ProducerMessage, err error) { + defer tracing.CleanContext() + tracing.ContinueContext(continueSnapShot) + operationName = pulsarAsyncPrefix + topic + pulsarCallbackSuffix + + localSpan, localErr := tracing.CreateLocalSpan(operationName, + tracing.WithComponent(pulsarAsyncComponentID), + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithTag(tracing.TagMQTopic, nativeProducer.topic), + ) + if localErr != nil { + zuper(id, message, err) + return + } + if err != nil { + span.Error(err.Error()) + } + localSpan.Tag(tracing.TagMQBroker, lookup.PhysicalAddr.String()) + localSpan.Tag(tracing.TagMQMsgID, id.String()) + + zuper(id, message, err) + localSpan.SetPeer(peer) + localSpan.End() + } + + span.SetPeer(peer) + invocation.ChangeArg(2, callbackFunc) + invocation.SetContext(span) + return nil +} + +func (s *SendAsyncInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + if invocation.GetContext() == nil { + return nil + } + invocation.GetContext().(tracing.Span).End() + return nil +} diff --git a/plugins/pulsar/pulsar/send_producer.go b/plugins/pulsar/pulsar/send_producer.go new file mode 100644 index 0000000..362e36a --- /dev/null +++ b/plugins/pulsar/pulsar/send_producer.go @@ -0,0 +1,83 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "github.com/apache/pulsar-client-go/pulsar" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + pulsarSyncPrefix = "Pulsar/" + pulsarSyncSuffix = "/Producer" + pulsarSyncComponentID = 73 +) + +type SendInterceptor struct { +} + +func (s *SendInterceptor) BeforeInvoke(invocation operator.Invocation) error { + defaultProducer := invocation.CallerInstance().(*nativepartitionProducer) + topic := defaultProducer.options.Topic + msg := invocation.Args()[1].(*pulsar.ProducerMessage) + lookup, err := defaultProducer.client.lookupService.Lookup(topic) + if err != nil { + return err + } + peer := lookup.PhysicalAddr.String() + operationName := pulsarSyncPrefix + topic + pulsarSyncSuffix + + span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error { + if msg.Properties == nil { + msg.Properties = map[string]string{ + headerKey: headerValue, + } + return nil + } + msg.Properties[headerKey] = headerValue + return nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(pulsarSyncComponentID), + tracing.WithTag(tracing.TagMQBroker, lookup.PhysicalAddr.String()), + tracing.WithTag(tracing.TagMQTopic, defaultProducer.topic), + ) + if err != nil { + return err + } + + invocation.SetContext(span) + return nil +} + +func (s *SendInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + if invocation.GetContext() == nil { + return nil + } + span := invocation.GetContext().(tracing.Span) + if err, ok := result[1].(error); ok && err != nil { + span.Error(err.Error()) + } + if msgRet, ok := result[0].(pulsar.MessageID); ok && msgRet != nil { + span.Tag(tracing.TagMQMsgID, msgRet.String()) + } + span.End() + return nil +} diff --git a/plugins/pulsar/pulsar/structures.go b/plugins/pulsar/pulsar/structures.go new file mode 100644 index 0000000..c746251 --- /dev/null +++ b/plugins/pulsar/pulsar/structures.go @@ -0,0 +1,69 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import "net/url" + +//skywalking:native github.com/apache/pulsar-client-go/pulsar partitionProducer +type nativepartitionProducer struct { + client *nativeclient + topic string + options *nativeProducerOptions +} + +//skywalking:native github.com/apache/pulsar-client-go/pulsar client +type nativeclient struct { + lookupService nativeLookupService +} + +//skywalking:native github.com/apache/pulsar-client-go/pulsar ProducerOptions +type nativeProducerOptions struct { + Topic string +} + +//skywalking:native github.com/apache/pulsar-client-go/pulsar/internal LookupService +type nativeLookupService interface { + Lookup(topic string) (*LookupResult, error) +} + +type LookupResult struct { + LogicalAddr *url.URL + PhysicalAddr *url.URL +} + +//skywalking:native github.com/apache/pulsar-client-go/pulsar MessageID +type MessageID interface { + String() string +} + +//skywalking:native github.com/apache/pulsar-client-go/pulsar ProducerMessage +type ProducerMessage struct { + Properties map[string]string +} + +//skywalking:native github.com/apache/pulsar-client-go/pulsar consumer +type nativeconsumer struct { + client *nativeclient + topic string + options *nativeConsumerOptions +} + +//skywalking:native github.com/apache/pulsar-client-go/pulsar ConsumerOptions +type nativeConsumerOptions struct { + Topic string +} diff --git a/test/plugins/scenarios/pulsar/bin/startup.sh b/test/plugins/scenarios/pulsar/bin/startup.sh new file mode 100755 index 0000000..dcfbaa4 --- /dev/null +++ b/test/plugins/scenarios/pulsar/bin/startup.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +home="$(cd "$(dirname $0)"; pwd)" +go build ${GO_BUILD_OPTS} -o pulsar + +./pulsar \ No newline at end of file diff --git a/test/plugins/scenarios/pulsar/config/excepted.yml b/test/plugins/scenarios/pulsar/config/excepted.yml new file mode 100644 index 0000000..9daa253 --- /dev/null +++ b/test/plugins/scenarios/pulsar/config/excepted.yml @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +segmentItems: + - serviceName: pulsar + segmentSize: 2 + segments: + - segmentId: not null + spans: + - operationName: Pulsar/sw-topic/Producer + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 73 + isError: false + spanType: Exit + peer: pulsar://pulsar-server:6650 + skipAnalysis: false + tags: + - { key: mq.broker, value: not null } + - { key: mq.topic, value: not null } + - { key: mq.msg.id, value: not null } + - operationName: Pulsar/sw-topic/Consumer + parentSpanId: 0 + spanId: 2 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 74 + isError: false + spanType: Entry + peer: pulsar://pulsar-server:6650 + skipAnalysis: false + tags: + - { key: mq.broker, value: not null } + - { key: mq.topic, value: not null } + refs: + - { parentEndpoint: 'GET:/execute', networkAddress: 'pulsar://pulsar-server:6650', + refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: pulsar, + traceId: not null } + - operationName: Pulsar/sw-topic/AsyncProducer + parentSpanId: 0 + spanId: 3 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 73 + isError: false + spanType: Exit + peer: pulsar://pulsar-server:6650 + skipAnalysis: false + tags: + - { key: mq.broker, value: not null } + - { key: mq.topic, value: not null } + - operationName: Pulsar/sw-topic/Producer/Callback + parentSpanId: 3 + spanId: 4 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 73 + isError: false + spanType: Local + peer: pulsar://pulsar-server:6650 + skipAnalysis: false + tags: + - { key: mq.topic, value: not null } + - { key: mq.broker, value: not null } + - { key: mq.msg.id, value: not null } + - operationName: Pulsar/sw-topic/Consumer + parentSpanId: 0 + spanId: 5 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 74 + isError: false + spanType: Entry + peer: pulsar://pulsar-server:6650 + skipAnalysis: false + tags: + - { key: mq.broker, value: not null } + - { key: mq.topic, value: not null } + refs: + - { parentEndpoint: 'GET:/execute', networkAddress: 'pulsar://pulsar-server:6650', + refType: CrossProcess, parentSpanId: 3, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: pulsar, + traceId: not null } + - operationName: GET:/execute + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 5004 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - { key: http.method, value: GET } + - { key: url, value: 'service:8080/execute' } + - { key: status_code, value: '200' } +meterItems: [ ] +logItems: [ ] \ No newline at end of file diff --git a/test/plugins/scenarios/pulsar/go.mod b/test/plugins/scenarios/pulsar/go.mod new file mode 100644 index 0000000..945994b --- /dev/null +++ b/test/plugins/scenarios/pulsar/go.mod @@ -0,0 +1,7 @@ +module test/plugins/scenarios/pulsar + +go 1.18 + +require ( + github.com/apache/pulsar-client-go v0.12.0 +) \ No newline at end of file diff --git a/test/plugins/scenarios/pulsar/go.sum b/test/plugins/scenarios/pulsar/go.sum new file mode 100644 index 0000000..0f5c7f4 --- /dev/null +++ b/test/plugins/scenarios/pulsar/go.sum @@ -0,0 +1,2 @@ +github.com/apache/pulsar-client-go v0.12.0 h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I= +github.com/apache/pulsar-client-go v0.12.0/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= diff --git a/test/plugins/scenarios/pulsar/main.go b/test/plugins/scenarios/pulsar/main.go new file mode 100644 index 0000000..507b092 --- /dev/null +++ b/test/plugins/scenarios/pulsar/main.go @@ -0,0 +1,141 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + + _ "github.com/apache/skywalking-go" +) + +type testFunc func() error + +var ( + url = "pulsar://pulsar-server:6650" + msg = "I love skywalking 3 thousand" + topic = "sw-topic" + subscription = "sw-subscription" + client pulsar.Client + ctx = context.Background() +) + +func main() { + var err error + client, err = pulsar.NewClient(pulsar.ClientOptions{ + URL: url, + }) + if err != nil { + panic(err) + } + + route := http.NewServeMux() + route.HandleFunc("/execute", func(res http.ResponseWriter, req *http.Request) { + testProCon() + _, _ = res.Write([]byte("execute success")) + }) + route.HandleFunc("/health", func(writer http.ResponseWriter, request *http.Request) { + _, _ = writer.Write([]byte("ok")) + }) + err = http.ListenAndServe(":8080", route) + if err != nil { + fmt.Printf("client start error: %v \n", err) + } +} + +func testProCon() { + tests := []struct { + name string + fn testFunc + }{ + {"sendMsg", sendMsg}, + {"sendAsyncMsg", sendAsyncMsg}, + } + for _, test := range tests { + fmt.Printf("excute test case: %s\n", test.name) + if subErr := test.fn(); subErr != nil { + fmt.Printf("test case %s failed: %v\n", test.name, subErr) + } + } +} + +func consumeHelper() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: topic, + SubscriptionName: subscription, + }) + if err != nil { + log.Fatal(err) + } + for { + msg, err := consumer.Receive(ctx) + if err != nil { + log.Fatal(err) + } + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), string(msg.Payload())) + } + }() +} + +func sendMsg() error { + go consumeHelper() + time.Sleep(time.Second) + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + if err != nil { + return err + } + if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(msg), + }); err != nil { + return err + } else { + log.Println("Published message: ", msgId) + } + return nil +} + +func sendAsyncMsg() error { + time.Sleep(time.Second) + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + }) + if err != nil { + return err + } + + producer.SendAsync(ctx, &pulsar.ProducerMessage{ + Payload: []byte(msg), + }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err error) { + log.Printf("ID = %v, Properties = %v", id, message.Properties) + }) + time.Sleep(time.Second) + return nil +} \ No newline at end of file diff --git a/test/plugins/scenarios/pulsar/plugin.yml b/test/plugins/scenarios/pulsar/plugin.yml new file mode 100644 index 0000000..285f4b5 --- /dev/null +++ b/test/plugins/scenarios/pulsar/plugin.yml @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +entry-service: http://${HTTP_HOST}:${HTTP_PORT}/execute +health-checker: http://${HTTP_HOST}:${HTTP_PORT}/health +start-script: ./bin/startup.sh +framework: github.com/apache/pulsar-client-go +export-port: 8080 +support-version: + - go: 1.18 + framework: + - v0.12.0 +dependencies: + pulsar-server: + image: apachepulsar/pulsar:3.2.0 + command: ["bash","-c", "bin/pulsar standalone"] + ports: + - 6650 \ No newline at end of file diff --git a/tools/go-agent/instrument/plugins/register.go b/tools/go-agent/instrument/plugins/register.go index 38336d0..7726b41 100644 --- a/tools/go-agent/instrument/plugins/register.go +++ b/tools/go-agent/instrument/plugins/register.go @@ -38,6 +38,7 @@ import ( "github.com/apache/skywalking-go/plugins/microv4" "github.com/apache/skywalking-go/plugins/mongo" "github.com/apache/skywalking-go/plugins/mux" + "github.com/apache/skywalking-go/plugins/pulsar" "github.com/apache/skywalking-go/plugins/rocketmq" runtime_metrics "github.com/apache/skywalking-go/plugins/runtimemetrics" sql_entry "github.com/apache/skywalking-go/plugins/sql/entry" @@ -64,6 +65,7 @@ func init() { registerFramework(fiber.NewInstrument()) registerFramework(rocketmq.NewInstrument()) registerFramework(amqp.NewInstrument()) + registerFramework(pulsar.NewInstrument()) // fasthttp related instruments registerFramework(fasthttp_client.NewInstrument())