This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 37820ce  feature: add support for Pulsar (#173)
37820ce is described below

commit 37820cedda3764d9049e3892d7cbf39876f3fce0
Author: Starry <codeprince2...@163.com>
AuthorDate: Mon Mar 11 11:17:18 2024 +0800

    feature: add support for Pulsar (#173)
---
 .github/workflows/plugin-tests.yaml               |   1 +
 CHANGES.md                                        |   6 +-
 docs/en/agent/support-plugins.md                  |   1 +
 go.work                                           |   2 +
 plugins/pulsar/go.mod                             |   5 +
 plugins/pulsar/go.sum                             |   2 +
 plugins/pulsar/instrument.go                      |  90 ++++++++++++++
 plugins/pulsar/pulsar/receive_consumer.go         |  70 +++++++++++
 plugins/pulsar/pulsar/send_async_producer.go      | 105 ++++++++++++++++
 plugins/pulsar/pulsar/send_producer.go            |  83 +++++++++++++
 plugins/pulsar/pulsar/structures.go               |  69 +++++++++++
 test/plugins/scenarios/pulsar/bin/startup.sh      |  22 ++++
 test/plugins/scenarios/pulsar/config/excepted.yml | 121 +++++++++++++++++++
 test/plugins/scenarios/pulsar/go.mod              |   7 ++
 test/plugins/scenarios/pulsar/go.sum              |   2 +
 test/plugins/scenarios/pulsar/main.go             | 141 ++++++++++++++++++++++
 test/plugins/scenarios/pulsar/plugin.yml          |  31 +++++
 tools/go-agent/instrument/plugins/register.go     |   2 +
 18 files changed, 759 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/plugin-tests.yaml 
b/.github/workflows/plugin-tests.yaml
index feeb7fa..737184d 100644
--- a/.github/workflows/plugin-tests.yaml
+++ b/.github/workflows/plugin-tests.yaml
@@ -98,6 +98,7 @@ jobs:
           - echov4
           - rocketmq
           - amqp
+          - pulsar
     steps:
       - uses: actions/checkout@v2
         with:
diff --git a/CHANGES.md b/CHANGES.md
index 08df91e..467c25f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,11 @@ Changes by Version
 ==================
 Release Notes.
 
+0.5.0
+------------------
+#### Plugins
+* Support [Pulsar](https://github.com/apache/pulsar-client-go) MQ.
+
 0.4.0
 ------------------
 #### Features
@@ -92,7 +97,6 @@ Release Notes.
 
 0.1.0
 ------------------
-
 #### Features
 * Initialize the agent core and user import library.
 * Support gRPC reporter for management, tracing protocols.
diff --git a/docs/en/agent/support-plugins.md b/docs/en/agent/support-plugins.md
index b0a455a..92502f1 100644
--- a/docs/en/agent/support-plugins.md
+++ b/docs/en/agent/support-plugins.md
@@ -30,6 +30,7 @@ metrics based on the tracing data.
 * MQ Client
   * `rocketMQ`: 
[rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested 
v2.1.2.
   * `amqp`: [AMQP](https://github.com/rabbitmq/amqp091-go) tested v1.9.0.
+  * `pulsar`: [pulsar-client-go](https://github.com/apache/pulsar-client-go) 
tested v0.12.0.
 
 # Metrics Plugins
 The meter plugin provides the advanced metrics collections.
diff --git a/go.work b/go.work
index 08b1e93..fd0ce8b 100644
--- a/go.work
+++ b/go.work
@@ -25,6 +25,7 @@ use (
        ./plugins/echov4
        ./plugins/rocketmq
        ./plugins/amqp
+       ./plugins/pulsar
 
        ./test/benchmark-codebase/consumer
        ./test/benchmark-codebase/provider
@@ -56,6 +57,7 @@ use (
        ./test/plugins/scenarios/echov4
        ./test/plugins/scenarios/rocketmq
        ./test/plugins/scenarios/amqp
+       ./test/plugins/scenarios/pulsar
 
        ./tools/go-agent
 
diff --git a/plugins/pulsar/go.mod b/plugins/pulsar/go.mod
new file mode 100644
index 0000000..4e5628e
--- /dev/null
+++ b/plugins/pulsar/go.mod
@@ -0,0 +1,5 @@
+module github.com/apache/skywalking-go/plugins/pulsar
+
+go 1.18
+
+require github.com/apache/pulsar-client-go v0.12.0 // indirect
diff --git a/plugins/pulsar/go.sum b/plugins/pulsar/go.sum
new file mode 100644
index 0000000..0f5c7f4
--- /dev/null
+++ b/plugins/pulsar/go.sum
@@ -0,0 +1,2 @@
+github.com/apache/pulsar-client-go v0.12.0 
h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I=
+github.com/apache/pulsar-client-go v0.12.0/go.mod 
h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
diff --git a/plugins/pulsar/instrument.go b/plugins/pulsar/instrument.go
new file mode 100644
index 0000000..53da703
--- /dev/null
+++ b/plugins/pulsar/instrument.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "embed"
+
+       "github.com/apache/skywalking-go/plugins/core/instrument"
+)
+
+//go:embed *
+var fs embed.FS
+
+//skywalking:nocopy
+type Instrument struct {
+}
+
+func NewInstrument() *Instrument {
+       return &Instrument{}
+}
+
+func (i *Instrument) Name() string {
+       return "pulsar"
+}
+
+func (i *Instrument) BasePackage() string {
+       return "github.com/apache/pulsar-client-go"
+}
+
+func (i *Instrument) VersionChecker(version string) bool {
+       return true
+}
+
+func (i *Instrument) Points() []*instrument.Point {
+       return []*instrument.Point{
+               {
+                       PackagePath: "pulsar",
+                       At: instrument.NewMethodEnhance("*partitionProducer", 
"Send",
+                               instrument.WithArgsCount(2),
+                               instrument.WithArgType(0, "context.Context"),
+                               instrument.WithArgType(1, "*ProducerMessage"),
+                               instrument.WithResultCount(2),
+                               instrument.WithResultType(0, "MessageID"),
+                               instrument.WithResultType(1, "error"),
+                       ),
+                       Interceptor: "SendInterceptor",
+               },
+               {
+                       PackagePath: "pulsar",
+                       At: instrument.NewMethodEnhance("*partitionProducer", 
"SendAsync",
+                               instrument.WithArgsCount(3),
+                               instrument.WithArgType(0, "context.Context"),
+                               instrument.WithArgType(1, "*ProducerMessage"),
+                               instrument.WithArgType(2, "func(MessageID, 
*ProducerMessage, error)"),
+                               instrument.WithResultCount(0),
+                       ),
+                       Interceptor: "SendAsyncInterceptor",
+               },
+               {
+                       PackagePath: "pulsar",
+                       At: instrument.NewMethodEnhance("*consumer", "Receive",
+                               instrument.WithArgsCount(1),
+                               instrument.WithArgType(0, "context.Context"),
+                               instrument.WithResultCount(2),
+                               instrument.WithResultType(0, "Message"),
+                               instrument.WithResultType(1, "error"),
+                       ),
+                       Interceptor: "ReceiveInterceptor",
+               },
+       }
+}
+
+func (i *Instrument) FS() *embed.FS {
+       return &fs
+}
diff --git a/plugins/pulsar/pulsar/receive_consumer.go 
b/plugins/pulsar/pulsar/receive_consumer.go
new file mode 100644
index 0000000..1e75467
--- /dev/null
+++ b/plugins/pulsar/pulsar/receive_consumer.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "github.com/apache/pulsar-client-go/pulsar"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+       pulsarReceivePrefix      = "Pulsar/"
+       pulsarReceiveSuffix      = "/Consumer"
+       pulsarReceiveComponentID = 74
+)
+
+type ReceiveInterceptor struct {
+}
+
+func (r *ReceiveInterceptor) BeforeInvoke(invocation operator.Invocation) 
error {
+       return nil
+}
+
+func (r *ReceiveInterceptor) AfterInvoke(invocation operator.Invocation, 
result ...interface{}) error {
+       nativeConsumer := invocation.CallerInstance().(*nativeconsumer)
+       topic := nativeConsumer.options.Topic
+       lookup, err := nativeConsumer.client.lookupService.Lookup(topic)
+       if err != nil {
+               return err
+       }
+       message := result[0].(pulsar.Message)
+       peer := lookup.PhysicalAddr.String()
+       operationName := pulsarReceivePrefix + topic + pulsarReceiveSuffix
+
+       span, err := tracing.CreateEntrySpan(operationName, func(headerKey 
string) (string, error) {
+               return message.Properties()[headerKey], nil
+       },
+               tracing.WithLayer(tracing.SpanLayerMQ),
+               tracing.WithComponent(pulsarReceiveComponentID),
+               tracing.WithTag(tracing.TagMQBroker, 
lookup.PhysicalAddr.String()),
+               tracing.WithTag(tracing.TagMQTopic, nativeConsumer.topic),
+       )
+       if err != nil {
+               return err
+       }
+
+       if err, ok := result[1].(pulsar.Error); ok {
+               span.Tag(tracing.TagMQStatus, err.Error())
+               span.Error(err.Error())
+       }
+       span.SetPeer(peer)
+       span.End()
+       return nil
+}
diff --git a/plugins/pulsar/pulsar/send_async_producer.go 
b/plugins/pulsar/pulsar/send_async_producer.go
new file mode 100644
index 0000000..cd91091
--- /dev/null
+++ b/plugins/pulsar/pulsar/send_async_producer.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+       pulsarAsyncPrefix      = "Pulsar/"
+       pulsarAsyncSuffix      = "/AsyncProducer"
+       pulsarCallbackSuffix   = "/Producer/Callback"
+       pulsarAsyncComponentID = 73
+)
+
+type SendAsyncInterceptor struct {
+}
+
+func (s *SendAsyncInterceptor) BeforeInvoke(invocation operator.Invocation) 
error {
+       nativeProducer := invocation.CallerInstance().(*nativepartitionProducer)
+       topic := nativeProducer.options.Topic
+       msg := invocation.Args()[1].(*ProducerMessage)
+       lookup, err := nativeProducer.client.lookupService.Lookup(topic)
+       if err != nil {
+               return err
+       }
+       peer := lookup.PhysicalAddr.String()
+       operationName := pulsarAsyncPrefix + topic + pulsarAsyncSuffix
+
+       span, err := tracing.CreateExitSpan(operationName, peer, 
func(headerKey, headerValue string) error {
+               if msg.Properties == nil {
+                       msg.Properties = map[string]string{
+                               headerKey: headerValue,
+                       }
+                       return nil
+               }
+               msg.Properties[headerKey] = headerValue
+               return nil
+       },
+               tracing.WithLayer(tracing.SpanLayerMQ),
+               tracing.WithComponent(pulsarAsyncComponentID),
+               tracing.WithTag(tracing.TagMQBroker, 
lookup.PhysicalAddr.String()),
+               tracing.WithTag(tracing.TagMQTopic, nativeProducer.topic),
+       )
+       if err != nil {
+               return err
+       }
+
+       continueSnapShot := tracing.CaptureContext()
+       zuper := invocation.Args()[2].(func(id MessageID, message 
*ProducerMessage, err error))
+
+       callbackFunc := func(id MessageID, message *ProducerMessage, err error) 
{
+               defer tracing.CleanContext()
+               tracing.ContinueContext(continueSnapShot)
+               operationName = pulsarAsyncPrefix + topic + pulsarCallbackSuffix
+
+               localSpan, localErr := tracing.CreateLocalSpan(operationName,
+                       tracing.WithComponent(pulsarAsyncComponentID),
+                       tracing.WithLayer(tracing.SpanLayerMQ),
+                       tracing.WithTag(tracing.TagMQTopic, 
nativeProducer.topic),
+               )
+               if localErr != nil {
+                       zuper(id, message, err)
+                       return
+               }
+               if err != nil {
+                       span.Error(err.Error())
+               }
+               localSpan.Tag(tracing.TagMQBroker, lookup.PhysicalAddr.String())
+               localSpan.Tag(tracing.TagMQMsgID, id.String())
+
+               zuper(id, message, err)
+               localSpan.SetPeer(peer)
+               localSpan.End()
+       }
+
+       span.SetPeer(peer)
+       invocation.ChangeArg(2, callbackFunc)
+       invocation.SetContext(span)
+       return nil
+}
+
+func (s *SendAsyncInterceptor) AfterInvoke(invocation operator.Invocation, 
result ...interface{}) error {
+       if invocation.GetContext() == nil {
+               return nil
+       }
+       invocation.GetContext().(tracing.Span).End()
+       return nil
+}
diff --git a/plugins/pulsar/pulsar/send_producer.go 
b/plugins/pulsar/pulsar/send_producer.go
new file mode 100644
index 0000000..362e36a
--- /dev/null
+++ b/plugins/pulsar/pulsar/send_producer.go
@@ -0,0 +1,83 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "github.com/apache/pulsar-client-go/pulsar"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+       pulsarSyncPrefix      = "Pulsar/"
+       pulsarSyncSuffix      = "/Producer"
+       pulsarSyncComponentID = 73
+)
+
+type SendInterceptor struct {
+}
+
+func (s *SendInterceptor) BeforeInvoke(invocation operator.Invocation) error {
+       defaultProducer := 
invocation.CallerInstance().(*nativepartitionProducer)
+       topic := defaultProducer.options.Topic
+       msg := invocation.Args()[1].(*pulsar.ProducerMessage)
+       lookup, err := defaultProducer.client.lookupService.Lookup(topic)
+       if err != nil {
+               return err
+       }
+       peer := lookup.PhysicalAddr.String()
+       operationName := pulsarSyncPrefix + topic + pulsarSyncSuffix
+
+       span, err := tracing.CreateExitSpan(operationName, peer, 
func(headerKey, headerValue string) error {
+               if msg.Properties == nil {
+                       msg.Properties = map[string]string{
+                               headerKey: headerValue,
+                       }
+                       return nil
+               }
+               msg.Properties[headerKey] = headerValue
+               return nil
+       },
+               tracing.WithLayer(tracing.SpanLayerMQ),
+               tracing.WithComponent(pulsarSyncComponentID),
+               tracing.WithTag(tracing.TagMQBroker, 
lookup.PhysicalAddr.String()),
+               tracing.WithTag(tracing.TagMQTopic, defaultProducer.topic),
+       )
+       if err != nil {
+               return err
+       }
+
+       invocation.SetContext(span)
+       return nil
+}
+
+func (s *SendInterceptor) AfterInvoke(invocation operator.Invocation, result 
...interface{}) error {
+       if invocation.GetContext() == nil {
+               return nil
+       }
+       span := invocation.GetContext().(tracing.Span)
+       if err, ok := result[1].(error); ok && err != nil {
+               span.Error(err.Error())
+       }
+       if msgRet, ok := result[0].(pulsar.MessageID); ok && msgRet != nil {
+               span.Tag(tracing.TagMQMsgID, msgRet.String())
+       }
+       span.End()
+       return nil
+}
diff --git a/plugins/pulsar/pulsar/structures.go 
b/plugins/pulsar/pulsar/structures.go
new file mode 100644
index 0000000..c746251
--- /dev/null
+++ b/plugins/pulsar/pulsar/structures.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "net/url"
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar partitionProducer
+type nativepartitionProducer struct {
+       client  *nativeclient
+       topic   string
+       options *nativeProducerOptions
+}
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar client
+type nativeclient struct {
+       lookupService nativeLookupService
+}
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar ProducerOptions
+type nativeProducerOptions struct {
+       Topic string
+}
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar/internal 
LookupService
+type nativeLookupService interface {
+       Lookup(topic string) (*LookupResult, error)
+}
+
+type LookupResult struct {
+       LogicalAddr  *url.URL
+       PhysicalAddr *url.URL
+}
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar MessageID
+type MessageID interface {
+       String() string
+}
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar ProducerMessage
+type ProducerMessage struct {
+       Properties map[string]string
+}
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar consumer
+type nativeconsumer struct {
+       client  *nativeclient
+       topic   string
+       options *nativeConsumerOptions
+}
+
+//skywalking:native github.com/apache/pulsar-client-go/pulsar ConsumerOptions
+type nativeConsumerOptions struct {
+       Topic string
+}
diff --git a/test/plugins/scenarios/pulsar/bin/startup.sh 
b/test/plugins/scenarios/pulsar/bin/startup.sh
new file mode 100755
index 0000000..dcfbaa4
--- /dev/null
+++ b/test/plugins/scenarios/pulsar/bin/startup.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+home="$(cd "$(dirname $0)"; pwd)"
+go build ${GO_BUILD_OPTS} -o pulsar
+
+./pulsar
\ No newline at end of file
diff --git a/test/plugins/scenarios/pulsar/config/excepted.yml 
b/test/plugins/scenarios/pulsar/config/excepted.yml
new file mode 100644
index 0000000..9daa253
--- /dev/null
+++ b/test/plugins/scenarios/pulsar/config/excepted.yml
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+segmentItems:
+  - serviceName: pulsar
+    segmentSize: 2
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: Pulsar/sw-topic/Producer
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 73
+            isError: false
+            spanType: Exit
+            peer: pulsar://pulsar-server:6650
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+              - { key: mq.msg.id, value: not null }
+          - operationName: Pulsar/sw-topic/Consumer
+            parentSpanId: 0
+            spanId: 2
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 74
+            isError: false
+            spanType: Entry
+            peer: pulsar://pulsar-server:6650
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/execute', networkAddress: 
'pulsar://pulsar-server:6650',
+                  refType: CrossProcess, parentSpanId: 1, 
parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: pulsar,
+                  traceId: not null }
+          - operationName: Pulsar/sw-topic/AsyncProducer
+            parentSpanId: 0
+            spanId: 3
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 73
+            isError: false
+            spanType: Exit
+            peer: pulsar://pulsar-server:6650
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+          - operationName: Pulsar/sw-topic/Producer/Callback
+            parentSpanId: 3
+            spanId: 4
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 73
+            isError: false
+            spanType: Local
+            peer: pulsar://pulsar-server:6650
+            skipAnalysis: false
+            tags:
+              - { key: mq.topic, value: not null }
+              - { key: mq.broker, value: not null }
+              - { key: mq.msg.id, value: not null }
+          - operationName: Pulsar/sw-topic/Consumer
+            parentSpanId: 0
+            spanId: 5
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 74
+            isError: false
+            spanType: Entry
+            peer: pulsar://pulsar-server:6650
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/execute', networkAddress: 
'pulsar://pulsar-server:6650',
+                  refType: CrossProcess, parentSpanId: 3, 
parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: pulsar,
+                  traceId: not null }
+          - operationName: GET:/execute
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 5004
+            isError: false
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
+            tags:
+              - { key: http.method, value: GET }
+              - { key: url, value: 'service:8080/execute' }
+              - { key: status_code, value: '200' }
+meterItems: [ ]
+logItems: [ ]
\ No newline at end of file
diff --git a/test/plugins/scenarios/pulsar/go.mod 
b/test/plugins/scenarios/pulsar/go.mod
new file mode 100644
index 0000000..945994b
--- /dev/null
+++ b/test/plugins/scenarios/pulsar/go.mod
@@ -0,0 +1,7 @@
+module test/plugins/scenarios/pulsar
+
+go 1.18
+
+require (
+       github.com/apache/pulsar-client-go v0.12.0
+)
\ No newline at end of file
diff --git a/test/plugins/scenarios/pulsar/go.sum 
b/test/plugins/scenarios/pulsar/go.sum
new file mode 100644
index 0000000..0f5c7f4
--- /dev/null
+++ b/test/plugins/scenarios/pulsar/go.sum
@@ -0,0 +1,2 @@
+github.com/apache/pulsar-client-go v0.12.0 
h1:rrMlwpr6IgLRPXLRRh2vSlcw5tGV2PUSjZwmqgh2B2I=
+github.com/apache/pulsar-client-go v0.12.0/go.mod 
h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
diff --git a/test/plugins/scenarios/pulsar/main.go 
b/test/plugins/scenarios/pulsar/main.go
new file mode 100644
index 0000000..507b092
--- /dev/null
+++ b/test/plugins/scenarios/pulsar/main.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net/http"
+       "sync"
+       "time"
+
+       "github.com/apache/pulsar-client-go/pulsar"
+
+       _ "github.com/apache/skywalking-go"
+)
+
+type testFunc func() error
+
+var (
+       url          = "pulsar://pulsar-server:6650"
+       msg          = "I love skywalking 3 thousand"
+       topic        = "sw-topic"
+       subscription = "sw-subscription"
+       client       pulsar.Client
+       ctx          = context.Background()
+)
+
+func main() {
+       var err error
+       client, err = pulsar.NewClient(pulsar.ClientOptions{
+               URL: url,
+       })
+       if err != nil {
+               panic(err)
+       }
+
+       route := http.NewServeMux()
+       route.HandleFunc("/execute", func(res http.ResponseWriter, req 
*http.Request) {
+               testProCon()
+               _, _ = res.Write([]byte("execute success"))
+       })
+       route.HandleFunc("/health", func(writer http.ResponseWriter, request 
*http.Request) {
+               _, _ = writer.Write([]byte("ok"))
+       })
+       err = http.ListenAndServe(":8080", route)
+       if err != nil {
+               fmt.Printf("client start error: %v \n", err)
+       }
+}
+
+func testProCon() {
+       tests := []struct {
+               name string
+               fn   testFunc
+       }{
+               {"sendMsg", sendMsg},
+               {"sendAsyncMsg", sendAsyncMsg},
+       }
+       for _, test := range tests {
+               fmt.Printf("excute test case: %s\n", test.name)
+               if subErr := test.fn(); subErr != nil {
+                       fmt.Printf("test case %s failed: %v\n", test.name, 
subErr)
+               }
+       }
+}
+
+func consumeHelper() {
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+                       Topic:            topic,
+                       SubscriptionName: subscription,
+               })
+               if err != nil {
+                       log.Fatal(err)
+               }
+               for {
+                       msg, err := consumer.Receive(ctx)
+                       if err != nil {
+                               log.Fatal(err)
+                       }
+                       fmt.Printf("Received message msgId: %#v -- content: 
'%s'\n",
+                               msg.ID(), string(msg.Payload()))
+               }
+       }()
+}
+
+func sendMsg() error {
+       go consumeHelper()
+       time.Sleep(time.Second)
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic: topic,
+       })
+       if err != nil {
+               return err
+       }
+       if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
+               Payload: []byte(msg),
+       }); err != nil {
+               return err
+       } else {
+               log.Println("Published message: ", msgId)
+       }
+       return nil
+}
+
+func sendAsyncMsg() error {
+       time.Sleep(time.Second)
+       producer, err := client.CreateProducer(pulsar.ProducerOptions{
+               Topic: topic,
+       })
+       if err != nil {
+               return err
+       }
+
+       producer.SendAsync(ctx, &pulsar.ProducerMessage{
+               Payload: []byte(msg),
+       }, func(id pulsar.MessageID, message *pulsar.ProducerMessage, err 
error) {
+               log.Printf("ID = %v, Properties = %v", id, message.Properties)
+       })
+       time.Sleep(time.Second)
+       return nil
+}
\ No newline at end of file
diff --git a/test/plugins/scenarios/pulsar/plugin.yml 
b/test/plugins/scenarios/pulsar/plugin.yml
new file mode 100644
index 0000000..285f4b5
--- /dev/null
+++ b/test/plugins/scenarios/pulsar/plugin.yml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+entry-service: http://${HTTP_HOST}:${HTTP_PORT}/execute
+health-checker: http://${HTTP_HOST}:${HTTP_PORT}/health
+start-script: ./bin/startup.sh
+framework: github.com/apache/pulsar-client-go
+export-port: 8080
+support-version:
+  - go: 1.18
+    framework:
+      - v0.12.0
+dependencies:
+  pulsar-server:
+    image: apachepulsar/pulsar:3.2.0
+    command: ["bash","-c", "bin/pulsar standalone"]
+    ports:
+      - 6650
\ No newline at end of file
diff --git a/tools/go-agent/instrument/plugins/register.go 
b/tools/go-agent/instrument/plugins/register.go
index 38336d0..7726b41 100644
--- a/tools/go-agent/instrument/plugins/register.go
+++ b/tools/go-agent/instrument/plugins/register.go
@@ -38,6 +38,7 @@ import (
        "github.com/apache/skywalking-go/plugins/microv4"
        "github.com/apache/skywalking-go/plugins/mongo"
        "github.com/apache/skywalking-go/plugins/mux"
+       "github.com/apache/skywalking-go/plugins/pulsar"
        "github.com/apache/skywalking-go/plugins/rocketmq"
        runtime_metrics "github.com/apache/skywalking-go/plugins/runtimemetrics"
        sql_entry "github.com/apache/skywalking-go/plugins/sql/entry"
@@ -64,6 +65,7 @@ func init() {
        registerFramework(fiber.NewInstrument())
        registerFramework(rocketmq.NewInstrument())
        registerFramework(amqp.NewInstrument())
+       registerFramework(pulsar.NewInstrument())
 
        // fasthttp related instruments
        registerFramework(fasthttp_client.NewInstrument())

Reply via email to