This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 8573c6f  feature: add support for AMQP (#165)
8573c6f is described below

commit 8573c6f8ccb24ba1d8fe67cfde1cd544a7fb4ae0
Author: Starry <codeprince2...@163.com>
AuthorDate: Tue Feb 20 11:40:51 2024 +0800

    feature: add support for AMQP (#165)
---
 .github/workflows/plugin-tests.yaml             |   1 +
 CHANGES.md                                      |   1 +
 docs/en/agent/support-plugins.md                |   1 +
 go.work                                         |   2 +
 plugins/amqp/consumer.go                        |  37 ++++
 plugins/amqp/consumer_with_ctx.go               |  37 ++++
 plugins/amqp/dial_hook.go                       |  58 ++++++
 plugins/amqp/general_consumer.go                |  70 +++++++
 plugins/amqp/go.mod                             |   5 +
 plugins/amqp/go.sum                             |  28 +++
 plugins/amqp/instrument.go                      | 116 ++++++++++++
 plugins/amqp/producer.go                        |  85 +++++++++
 plugins/amqp/structures.go                      |  46 +++++
 test/plugins/scenarios/amqp/bin/startup.sh      |  22 +++
 test/plugins/scenarios/amqp/config/excepted.yml | 135 +++++++++++++
 test/plugins/scenarios/amqp/go.mod              |  19 ++
 test/plugins/scenarios/amqp/go.sum              | 186 ++++++++++++++++++
 test/plugins/scenarios/amqp/main.go             | 240 ++++++++++++++++++++++++
 test/plugins/scenarios/amqp/plugin.yml          |  39 ++++
 tools/go-agent/instrument/plugins/register.go   |   2 +
 tools/go-agent/tools/dst.go                     |   2 +
 tools/go-agent/tools/enhancement.go             |  14 ++
 22 files changed, 1146 insertions(+)

diff --git a/.github/workflows/plugin-tests.yaml 
b/.github/workflows/plugin-tests.yaml
index de6c719..acb180d 100644
--- a/.github/workflows/plugin-tests.yaml
+++ b/.github/workflows/plugin-tests.yaml
@@ -97,6 +97,7 @@ jobs:
           - fiber
           - echov4
           - rocketmq
+          - amqp
     steps:
       - uses: actions/checkout@v2
         with:
diff --git a/CHANGES.md b/CHANGES.md
index 53b43cd..9e329ec 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -12,6 +12,7 @@ Release Notes.
 * Add `redis.max_args_bytes` parameter for redis plugin.
 * Changing intercept point for gin, make sure interfaces could be grouped when 
params defined in relativePath.
 * Support [RocketMQ](https://github.com/apache/rocketmq-client-go) MQ.
+* Support [AMQP](https://github.com/rabbitmq/amqp091-go) MQ.
 
 #### Documentation
 
diff --git a/docs/en/agent/support-plugins.md b/docs/en/agent/support-plugins.md
index c6c3832..b0a455a 100644
--- a/docs/en/agent/support-plugins.md
+++ b/docs/en/agent/support-plugins.md
@@ -29,6 +29,7 @@ metrics based on the tracing data.
   * `go-redisv9`: [go-redis](https://github.com/redis/go-redis) tested v9.0.3 
to v9.0.5.
 * MQ Client
   * `rocketMQ`: 
[rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested 
v2.1.2.
+  * `amqp`: [AMQP](https://github.com/rabbitmq/amqp091-go) tested v1.9.0.
 
 # Metrics Plugins
 The meter plugin provides the advanced metrics collections.
diff --git a/go.work b/go.work
index e19cdc4..08b1e93 100644
--- a/go.work
+++ b/go.work
@@ -24,6 +24,7 @@ use (
        ./plugins/fiber
        ./plugins/echov4
        ./plugins/rocketmq
+       ./plugins/amqp
 
        ./test/benchmark-codebase/consumer
        ./test/benchmark-codebase/provider
@@ -54,6 +55,7 @@ use (
        ./test/plugins/scenarios/runtime_metrics
        ./test/plugins/scenarios/echov4
        ./test/plugins/scenarios/rocketmq
+       ./test/plugins/scenarios/amqp
 
        ./tools/go-agent
 
diff --git a/plugins/amqp/consumer.go b/plugins/amqp/consumer.go
new file mode 100644
index 0000000..0bbb335
--- /dev/null
+++ b/plugins/amqp/consumer.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package amqp
+
+import (
+       "github.com/rabbitmq/amqp091-go"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+)
+
+type ConsumerInterceptor struct {
+}
+
+func (c *ConsumerInterceptor) BeforeInvoke(invocation operator.Invocation) 
error {
+       return nil
+}
+
+func (c *ConsumerInterceptor) AfterInvoke(invocation operator.Invocation, 
results ...interface{}) error {
+       queue, consumerTag, args := invocation.Args()[0].(string), 
invocation.Args()[1].(string),
+               invocation.Args()[6].(amqp091.Table)
+       return GeneralConsumerAfterInvoke(invocation, queue, consumerTag, args, 
results...)
+}
diff --git a/plugins/amqp/consumer_with_ctx.go 
b/plugins/amqp/consumer_with_ctx.go
new file mode 100644
index 0000000..f7993f2
--- /dev/null
+++ b/plugins/amqp/consumer_with_ctx.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package amqp
+
+import (
+       "github.com/rabbitmq/amqp091-go"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+)
+
+type ConsumerWithCtxInterceptor struct {
+}
+
+func (cwi *ConsumerWithCtxInterceptor) BeforeInvoke(invocation 
operator.Invocation) error {
+       return nil
+}
+
+func (cwi *ConsumerWithCtxInterceptor) AfterInvoke(invocation 
operator.Invocation, results ...interface{}) error {
+       queue, consumerTag, args := invocation.Args()[1].(string), 
invocation.Args()[2].(string),
+               invocation.Args()[7].(amqp091.Table)
+       return GeneralConsumerAfterInvoke(invocation, queue, consumerTag, args, 
results...)
+}
diff --git a/plugins/amqp/dial_hook.go b/plugins/amqp/dial_hook.go
new file mode 100644
index 0000000..bf4b742
--- /dev/null
+++ b/plugins/amqp/dial_hook.go
@@ -0,0 +1,58 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package amqp
+
+import (
+       "fmt"
+       "net/url"
+
+       amqp "github.com/rabbitmq/amqp091-go"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+)
+
+type DialInterceptor struct {
+}
+
+func (d *DialInterceptor) BeforeInvoke(invocation operator.Invocation) error {
+       return nil
+}
+
+func (d *DialInterceptor) AfterInvoke(invocation operator.Invocation, result 
...interface{}) error {
+       if instance, ok := result[0].(*amqp.Connection); ok && instance != nil {
+               address := parseURI(invocation.Args()[0].(string))
+               
result[0].(operator.EnhancedInstance).SetSkyWalkingDynamicField(address)
+       }
+       return nil
+}
+
+func parseURI(uri string) string {
+       u, err := url.Parse(uri)
+       if err != nil {
+               return ""
+       }
+       return fmt.Sprintf("%s:%s", u.Hostname(), u.Port())
+}
+
+func getPeerInfo(filed interface{}) string {
+       instance, ok := filed.(operator.EnhancedInstance)
+       if !ok || instance == nil {
+               return ""
+       }
+       return instance.GetSkyWalkingDynamicField().(string)
+}
diff --git a/plugins/amqp/general_consumer.go b/plugins/amqp/general_consumer.go
new file mode 100644
index 0000000..d4c9087
--- /dev/null
+++ b/plugins/amqp/general_consumer.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package amqp
+
+import (
+       "fmt"
+
+       "github.com/rabbitmq/amqp091-go"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+       ConsumerComponentID = 145
+       amqpConsumerPrefix  = "AMQP/"
+       amqpConsumerSuffix  = "/Consumer"
+       tagMQConsumerTag    = "mq.consumer_tag"
+       tagMQReplyTo        = "mq.reply_to"
+       tagMQCorrelationID  = "mq.correlation_id"
+       tagMQArgs           = "mq.args"
+)
+
+func GeneralConsumerAfterInvoke(invocation operator.Invocation, queue, 
consumerTag string, args amqp091.Table, results ...interface{}) error {
+       deliveries := <-results[0].(<-chan Delivery)
+       if consumerTag == "" {
+               consumerTag = deliveries.ConsumerTag
+       }
+       operationName := amqpConsumerPrefix + queue + "/" + consumerTag + 
amqpConsumerSuffix
+
+       channel := invocation.CallerInstance().(*nativeChannel)
+       peer := getPeerInfo(channel.connection)
+
+       span, err := tracing.CreateEntrySpan(operationName, func(headerKey 
string) (string, error) {
+               return deliveries.Headers[headerKey].(string), nil
+       }, tracing.WithLayer(tracing.SpanLayerMQ),
+               tracing.WithComponent(ConsumerComponentID),
+               tracing.WithTag(tracing.TagMQBroker, peer),
+               tracing.WithTag(tracing.TagMQQueue, queue),
+               tracing.WithTag(tracing.TagMQMsgID, deliveries.MessageId),
+               tracing.WithTag(tagMQConsumerTag, consumerTag),
+               tracing.WithTag(tagMQCorrelationID, deliveries.CorrelationId),
+               tracing.WithTag(tagMQReplyTo, deliveries.ReplyTo),
+               tracing.WithTag(tagMQArgs, fmt.Sprintf("%v", args)),
+       )
+       if err != nil {
+               return err
+       }
+       span.SetPeer(peer)
+       if err, ok := results[1].(error); ok && err != nil {
+               span.Error(err.Error())
+       }
+       span.End()
+       return nil
+}
diff --git a/plugins/amqp/go.mod b/plugins/amqp/go.mod
new file mode 100644
index 0000000..b7516d3
--- /dev/null
+++ b/plugins/amqp/go.mod
@@ -0,0 +1,5 @@
+module github.com/apache/skywalking-go/plugins/amqp
+
+go 1.18
+
+require github.com/rabbitmq/amqp091-go v1.9.0 // indirect
diff --git a/plugins/amqp/go.sum b/plugins/amqp/go.sum
new file mode 100644
index 0000000..92e1569
--- /dev/null
+++ b/plugins/amqp/go.sum
@@ -0,0 +1,28 @@
+github.com/apache/skywalking-go/plugins/core 
v0.0.0-20240111105437-5510aef8b575 
h1:6mI+0wuojI6IECkErFmw/UCMG007IH6jXI2wAdQ0cpM=
+github.com/apache/skywalking-go/plugins/core 
v0.0.0-20240111105437-5510aef8b575/go.mod 
h1:P0tAFNAYJUNsiTlFQgxmCcRcu+7Y0MP8GBZfSI449CM=
+github.com/dave/dst v0.27.2 h1:4Y5VFTkhGLC1oddtNwuxxe36pnyLxMFXT51FOzH8Ekc=
+github.com/dave/dst v0.27.2/go.mod 
h1:jHh6EOibnHgcUW3WjKHisiooEkYwqpHLBSX1iOBhEyc=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rabbitmq/amqp091-go v1.9.0 
h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
+github.com/rabbitmq/amqp091-go v1.9.0/go.mod 
h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod 
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod 
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.2 
h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
+go.uber.org/goleak v1.2.1/go.mod 
h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
+golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
+golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
+golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/plugins/amqp/instrument.go b/plugins/amqp/instrument.go
new file mode 100644
index 0000000..3ad747d
--- /dev/null
+++ b/plugins/amqp/instrument.go
@@ -0,0 +1,116 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package amqp
+
+import (
+       "embed"
+
+       "github.com/apache/skywalking-go/plugins/core/instrument"
+)
+
+//go:embed *
+var fs embed.FS
+
+//skywalking:nocopy
+type Instrument struct{}
+
+func NewInstrument() *Instrument {
+       return &Instrument{}
+}
+
+func (i *Instrument) Name() string {
+       return "amqp"
+}
+
+func (i *Instrument) BasePackage() string {
+       return "github.com/rabbitmq/amqp091-go"
+}
+
+func (i *Instrument) VersionChecker(version string) bool {
+       return true
+}
+
+func (i *Instrument) Points() []*instrument.Point {
+       return []*instrument.Point{
+               {
+                       PackagePath: "",
+                       PackageName: "amqp091",
+                       At: instrument.NewMethodEnhance("*Channel", 
"PublishWithDeferredConfirmWithContext",
+                               instrument.WithArgsCount(6),
+                               instrument.WithArgType(0, "context.Context"),
+                               instrument.WithArgType(1, "string"),
+                               instrument.WithArgType(2, "string"),
+                               instrument.WithArgType(5, "Publishing"),
+                               instrument.WithResultCount(2),
+                               instrument.WithResultType(0, 
"*DeferredConfirmation"),
+                               instrument.WithResultType(1, "error"),
+                       ),
+                       Interceptor: "ProducerInterceptor",
+               },
+               {
+                       PackagePath: "",
+                       PackageName: "amqp091",
+                       At: instrument.NewMethodEnhance("*Channel", "Consume",
+                               instrument.WithArgsCount(7),
+                               instrument.WithArgType(0, "string"),
+                               instrument.WithArgType(1, "string"),
+                               instrument.WithArgType(6, "Table"),
+                               instrument.WithResultCount(2),
+                               instrument.WithResultType(0, "<-chan Delivery"),
+                               instrument.WithResultType(1, "error"),
+                       ),
+                       Interceptor: "ConsumerInterceptor",
+               },
+               {
+                       PackagePath: "",
+                       PackageName: "amqp091",
+                       At: instrument.NewMethodEnhance("*Channel", 
"ConsumeWithContext",
+                               instrument.WithArgsCount(8),
+                               instrument.WithArgType(1, "string"),
+                               instrument.WithArgType(2, "string"),
+                               instrument.WithArgType(7, "Table"),
+                               instrument.WithResultCount(2),
+                               instrument.WithResultType(0, "<-chan Delivery"),
+                               instrument.WithResultType(1, "error"),
+                       ),
+                       Interceptor: "ConsumerWithCtxInterceptor",
+               },
+               {
+                       PackagePath: "",
+                       PackageName: "amqp091",
+                       At: instrument.NewStaticMethodEnhance("DialConfig",
+                               instrument.WithArgsCount(2),
+                               instrument.WithArgType(0, "string"),
+                               instrument.WithArgType(1, "Config"),
+                               instrument.WithResultCount(2),
+                               instrument.WithResultType(0, "*Connection"),
+                               instrument.WithResultType(1, "error"),
+                       ),
+                       Interceptor: "DialInterceptor",
+               },
+               {
+                       PackagePath: "",
+                       PackageName: "amqp091",
+                       At:          instrument.NewStructEnhance("Connection"),
+               },
+       }
+}
+
+func (i *Instrument) FS() *embed.FS {
+       return &fs
+}
diff --git a/plugins/amqp/producer.go b/plugins/amqp/producer.go
new file mode 100644
index 0000000..6953e88
--- /dev/null
+++ b/plugins/amqp/producer.go
@@ -0,0 +1,85 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package amqp
+
+import (
+       "github.com/rabbitmq/amqp091-go"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+       amqpSendPrefix      = "AMQP"
+       amqpSendSuffix      = "/Producer"
+       delimiter           = "/"
+       ProducerComponentID = 144
+       tagMQExchange       = "mq.exchange"
+       tagMQRoutingKey     = "mq.routing_key"
+)
+
+type ProducerInterceptor struct{}
+
+func (p *ProducerInterceptor) BeforeInvoke(invocation operator.Invocation) 
error {
+       channel := invocation.CallerInstance().(*nativeChannel)
+       peer := getPeerInfo(channel.connection)
+       exchange, routingKey, operationName := invocation.Args()[1].(string), 
invocation.Args()[2].(string), amqpSendPrefix
+       if exchange != "" {
+               exchange = invocation.Args()[1].(string)
+               operationName += delimiter + exchange
+       }
+       if routingKey != "" {
+               routingKey = invocation.Args()[2].(string)
+               operationName += delimiter + routingKey
+       }
+       publishing := invocation.Args()[5].(amqp091.Publishing)
+       operationName += amqpSendSuffix
+
+       span, err := tracing.CreateExitSpan(operationName, peer, 
func(headerKey, headerValue string) error {
+               if publishing.Headers == nil {
+                       publishing.Headers = amqp091.Table{
+                               headerKey: headerValue,
+                       }
+                       return nil
+               }
+               publishing.Headers[headerKey] = headerValue
+               return nil
+       }, tracing.WithLayer(tracing.SpanLayerMQ),
+               tracing.WithComponent(ProducerComponentID),
+               tracing.WithTag(tracing.TagMQBroker, peer),
+               tracing.WithTag(tagMQExchange, exchange),
+               tracing.WithTag(tagMQRoutingKey, routingKey),
+       )
+       if err != nil {
+               return err
+       }
+       invocation.SetContext(span)
+       return nil
+}
+
+func (p *ProducerInterceptor) AfterInvoke(invocation operator.Invocation, 
results ...interface{}) error {
+       if invocation.GetContext() == nil {
+               return nil
+       }
+       span := invocation.GetContext().(tracing.Span)
+       if err, ok := results[1].(error); ok && err != nil {
+               span.Error(err.Error())
+       }
+       span.End()
+       return nil
+}
diff --git a/plugins/amqp/structures.go b/plugins/amqp/structures.go
new file mode 100644
index 0000000..5675b6c
--- /dev/null
+++ b/plugins/amqp/structures.go
@@ -0,0 +1,46 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package amqp
+
+import (
+       "io"
+)
+
+//skywalking:native github.com/rabbitmq/amqp091-go Channel
+type nativeChannel struct {
+       connection *nativeConnection
+}
+
+//skywalking:native github.com/rabbitmq/amqp091-go Delivery
+type Delivery struct {
+       Headers       Table
+       MessageId     string //nolint
+       ConsumerTag   string
+       Exchange      string
+       RoutingKey    string
+       DeliveryTag   uint64
+       CorrelationId string //nolint
+       ReplyTo       string
+}
+
+type Table map[string]interface{}
+
+//skywalking:native github.com/rabbitmq/amqp091-go Connection
+type nativeConnection struct {
+       conn io.ReadWriteCloser
+}
diff --git a/test/plugins/scenarios/amqp/bin/startup.sh 
b/test/plugins/scenarios/amqp/bin/startup.sh
new file mode 100644
index 0000000..7305f29
--- /dev/null
+++ b/test/plugins/scenarios/amqp/bin/startup.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+home="$(cd "$(dirname $0)"; pwd)"
+go build ${GO_BUILD_OPTS} -o amqp main.go
+
+./amqp
\ No newline at end of file
diff --git a/test/plugins/scenarios/amqp/config/excepted.yml 
b/test/plugins/scenarios/amqp/config/excepted.yml
new file mode 100644
index 0000000..b60fbe2
--- /dev/null
+++ b/test/plugins/scenarios/amqp/config/excepted.yml
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+segmentItems:
+  - serviceName: amqp
+    segmentSize: 2
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: GET:/health
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 5004
+            isError: false
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
+            tags:
+              - { key: http.method, value: GET }
+              - { key: url, value: 'service:8080/health' }
+              - { key: status_code, value: '200' }
+      - segmentId: not null
+        spans:
+          - operationName: AMQP/sw-queue-1/Producer
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 144
+            isError: false
+            spanType: Exit
+            peer: amqp-server:5672
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: 'amqp-server:5672' }
+              - { key: mq.exchange, value: not null }
+              - { key: mq.routing_key, value: sw-queue-1 }
+          - operationName: AMQP/sw-queue-1/sw-consumer-1/Consumer
+            parentSpanId: 0
+            spanId: 2
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 145
+            isError: false
+            spanType: Entry
+            peer: amqp-server:5672
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: 'amqp-server:5672' }
+              - { key: mq.queue, value: sw-queue-1 }
+              - { key: mq.msg.id, value: not null }
+              - { key: mq.consumer_tag, value: sw-consumer-1 }
+              - { key: mq.correlation_id, value: not null }
+              - { key: mq.reply_to, value: not null }
+              - { key: mq.args, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/execute', networkAddress: 
'amqp-server:5672', refType: CrossProcess,
+                  parentSpanId: 1, parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: amqp,
+                  traceId: not null }
+          - operationName: AMQP/sw-queue-2/Producer
+            parentSpanId: 0
+            spanId: 3
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 144
+            isError: false
+            spanType: Exit
+            peer: amqp-server:5672
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: 'amqp-server:5672' }
+              - { key: mq.exchange, value: not null }
+              - { key: mq.routing_key, value: sw-queue-2 }
+          - operationName: AMQP/sw-queue-2/sw-consumer-2/Consumer
+            parentSpanId: 0
+            spanId: 4
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 145
+            isError: false
+            spanType: Entry
+            peer: amqp-server:5672
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: 'amqp-server:5672' }
+              - { key: mq.queue, value: sw-queue-2 }
+              - { key: mq.msg.id, value: not null }
+              - { key: mq.consumer_tag, value: sw-consumer-2 }
+              - { key: mq.correlation_id, value: not null }
+              - { key: mq.reply_to, value: not null }
+              - { key: mq.args, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/execute', networkAddress: 
'amqp-server:5672', refType: CrossProcess,
+                  parentSpanId: 3, parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: amqp,
+                  traceId: not null }
+          - operationName: GET:/execute
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 5004
+            isError: false
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
+            tags:
+              - { key: http.method, value: GET }
+              - { key: url, value: 'service:8080/execute' }
+              - { key: status_code, value: '200' }
+
+meterItems: [ ]
+logItems: [ ]
\ No newline at end of file
diff --git a/test/plugins/scenarios/amqp/go.mod 
b/test/plugins/scenarios/amqp/go.mod
new file mode 100644
index 0000000..9c42187
--- /dev/null
+++ b/test/plugins/scenarios/amqp/go.mod
@@ -0,0 +1,19 @@
+module test/plugins/scenarios/amqp
+
+go 1.18
+
+require (
+       github.com/apache/skywalking-go v0.3.0 // indirect
+       github.com/golang/protobuf v1.5.3 // indirect
+       github.com/google/go-cmp v0.5.9 // indirect
+       github.com/google/uuid v1.3.0 // indirect
+       github.com/pkg/errors v0.9.1 // indirect
+       github.com/rabbitmq/amqp091-go v1.9.0 // indirect
+       golang.org/x/net v0.10.0 // indirect
+       golang.org/x/sys v0.8.0 // indirect
+       golang.org/x/text v0.9.0 // indirect
+       google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // 
indirect
+       google.golang.org/grpc v1.55.0 // indirect
+       google.golang.org/protobuf v1.30.0 // indirect
+       skywalking.apache.org/repo/goapi v0.0.0-20230314034821-0c5a44bb767a // 
indirect
+)
diff --git a/test/plugins/scenarios/amqp/go.sum 
b/test/plugins/scenarios/amqp/go.sum
new file mode 100644
index 0000000..16c5606
--- /dev/null
+++ b/test/plugins/scenarios/amqp/go.sum
@@ -0,0 +1,186 @@
+cloud.google.com/go v0.26.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+cloud.google.com/go v0.34.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
+github.com/antihax/optional v1.0.0/go.mod 
h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
+github.com/apache/skywalking-go v0.3.0 
h1:e8yUYUWm8EvDSyTIESfGgRBw/0/Bnc40GbzPKK1P+mI=
+github.com/apache/skywalking-go v0.3.0/go.mod 
h1:r2FrabK/0XftMyosqre7WFZvZ1mEOVssJROo8xgcEW8=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod 
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/cespare/xxhash v1.1.0/go.mod 
h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
+github.com/client9/misspell v0.3.4/go.mod 
h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod 
h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod 
h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
+github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod 
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod 
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane 
v0.9.1-0.20191026205805-5f8ba28d4473/go.mod 
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod 
h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/go-control-plane 
v0.9.9-0.20201210154907-fd9021fe5dad/go.mod 
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane 
v0.9.9-0.20210217033140-668b12f5399d/go.mod 
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
+github.com/envoyproxy/go-control-plane 
v0.9.9-0.20210512163311-63b5d3c536b0/go.mod 
h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod 
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/ghodss/yaml v1.0.0/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod 
h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/mock v1.1.1/go.mod 
h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.3/go.mod 
h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod 
h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod 
h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod 
h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod 
h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod 
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.1/go.mod 
h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
+github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.4.3/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/protobuf v1.5.3 
h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
+github.com/golang/protobuf v1.5.3/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/google/go-cmp v0.2.0/go.mod 
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/go-cmp v0.5.9/go.mod 
h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/uuid v1.1.2/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod 
h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
+github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/rabbitmq/amqp091-go v1.9.0 
h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
+github.com/rabbitmq/amqp091-go v1.9.0/go.mod 
h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
+github.com/rogpeppe/fastuuid v1.2.0/go.mod 
h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod 
h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod 
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
+github.com/stretchr/testify v1.7.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod 
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/yuin/goldmark v1.3.5/go.mod 
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yuin/goldmark v1.4.13/go.mod 
h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+go.opentelemetry.io/proto/otlp v0.7.0/go.mod 
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
+go.uber.org/goleak v1.2.1/go.mod 
h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod 
h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod 
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod 
h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod 
h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod 
h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod 
h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
+golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod 
h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod 
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod 
h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod 
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod 
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod 
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod 
h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod 
h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod 
h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod 
h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
+golang.org/x/tools v0.1.3/go.mod 
h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.12/go.mod 
h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod 
h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod 
h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod 
h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod 
h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod 
h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod 
h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod 
h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24=
+google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 
h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
+google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod 
h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
+google.golang.org/grpc v1.19.0/go.mod 
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod 
h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod 
h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.27.0/go.mod 
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.33.1/go.mod 
h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
+google.golang.org/grpc v1.36.0/go.mod 
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.38.0/go.mod 
h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
+google.golang.org/grpc v1.40.0/go.mod 
h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
+google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
+google.golang.org/grpc v1.55.0/go.mod 
h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod 
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod 
h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod 
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.22.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.25.0/go.mod 
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.29.0/go.mod 
h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+google.golang.org/protobuf v1.30.0 
h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
+google.golang.org/protobuf v1.30.0/go.mod 
h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+skywalking.apache.org/repo/goapi v0.0.0-20230314034821-0c5a44bb767a 
h1:m8DTnaSEOEnPXRWmA6g7isbdqw7WPZP6SnaEHz1Sx7s=
+skywalking.apache.org/repo/goapi v0.0.0-20230314034821-0c5a44bb767a/go.mod 
h1:LcZMcxDjdJPn5yetydFnxe0l7rmiv8lvHEnzRbsey14=
diff --git a/test/plugins/scenarios/amqp/main.go 
b/test/plugins/scenarios/amqp/main.go
new file mode 100644
index 0000000..66adb79
--- /dev/null
+++ b/test/plugins/scenarios/amqp/main.go
@@ -0,0 +1,240 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net/http"
+       "time"
+
+       amqp "github.com/rabbitmq/amqp091-go"
+
+       _ "github.com/apache/skywalking-go"
+)
+
+type testFunc func(RabbitClient) error
+
+var (
+       uri                    = "amqp://admin:123456@amqp-server:5672"
+       queue1                 = "sw-queue-1"
+       queue2                 = "sw-queue-2"
+       body                   = "I love skywalking 3 thousand"
+       consumerTag1           = "sw-consumer-1"
+       consumerTag2           = "sw-consumer-2"
+       consumerTrigger        = make(chan struct{})
+       consumerWithCtxTrigger = make(chan struct{})
+)
+
+func main() {
+       conn, err := amqp.Dial(uri)
+       if err != nil {
+               panic(err)
+       }
+       client, err := NewRabbitMQClient(conn)
+       if err != nil {
+               panic(err)
+       }
+
+       route := http.NewServeMux()
+       route.HandleFunc("/execute", func(res http.ResponseWriter, req 
*http.Request) {
+               tests := []struct {
+                       name string
+                       fn   testFunc
+               }{
+                       {"testSimpleConsumer", testSimpleConsumer},
+                       {"testConsumerWithCtx", testConsumerWithCtx},
+               }
+               for _, test := range tests {
+                       fmt.Printf("excute test case: %s\n", test.name)
+                       if subErr := test.fn(client); subErr != nil {
+                               fmt.Printf("test case %s failed: %v", 
test.name, subErr)
+                       }
+               }
+               _, _ = res.Write([]byte("execute success"))
+       })
+       route.HandleFunc("/health", func(res http.ResponseWriter, req 
*http.Request) {
+               _, _ = res.Write([]byte("ok"))
+       })
+
+       err = http.ListenAndServe(":8080", route)
+       if err != nil {
+               log.Fatalf("client start error: %v \n", err)
+       }
+       select {}
+}
+
+func testSimpleConsumer(client RabbitClient) error {
+       producer(queue1, client)
+       go consumer()
+       consumerTrigger <- struct{}{}
+       time.Sleep(time.Second)
+       return nil
+}
+
+func testConsumerWithCtx(client RabbitClient) error {
+       producer(queue2, client)
+       go consumerWithContext()
+       consumerWithCtxTrigger <- struct{}{}
+       time.Sleep(time.Second)
+       return nil
+}
+
+func producer(queue string, client RabbitClient) {
+       client.CreateQueue(queue, true, false)
+       if err := client.Send(context.Background(), "", queue, amqp.Publishing{
+               ContentType:   "text/plain",
+               Body:          []byte(body),
+               Headers:       amqp.Table{},
+               CorrelationId: "1",
+               MessageId:     "2",
+       }); err != nil {
+               fmt.Println("Failed to Send msg, err: ", err)
+       }
+}
+
+func consumer() {
+       <-consumerTrigger
+       consumeConn, err := amqp.Dial(uri)
+       if err != nil {
+               fmt.Println("Failed to Dial Consume, err: ", err)
+       }
+       consumeClient, err := NewRabbitMQClient(consumeConn)
+       if err != nil {
+               fmt.Println("Failed to Channel Consume, err: ", err)
+       }
+       msgs, err := consumeClient.Consume(queue1, consumerTag1, false)
+       if err != nil {
+               fmt.Println("Failed to Consume msg, err: ", err)
+       }
+       log.Printf("[Consumer] Waiting for messages.\n")
+       for d := range msgs {
+               log.Printf("Received a message: %s\n", string(d.Body))
+               d.Ack(false)
+       }
+       err = consumeClient.Cancel(consumerTag1)
+       if err != nil {
+               fmt.Println("Failed to Cancel Consume, err: ", err)
+       }
+       err = consumeConn.Close()
+       if err != nil {
+               fmt.Println("Failed to Close Cancel, err: ", err)
+       }
+}
+
+func consumerWithContext() {
+       <-consumerWithCtxTrigger
+       consumeConn, err := amqp.Dial(uri)
+       if err != nil {
+               fmt.Println("Failed to Dial ConsumerWithContext, err: ", err)
+       }
+       consumeClient, err := NewRabbitMQClient(consumeConn)
+       if err != nil {
+               fmt.Println("Failed to Channel ConsumerWithContext, err: ", err)
+       }
+       msgs, err := consumeClient.Consume(queue2, consumerTag2, false)
+       if err != nil {
+               fmt.Println("Failed to Consume msg, err: ", err)
+       }
+       log.Printf("[ConsumerWithContext] Waiting for messages.\n")
+       for d := range msgs {
+               log.Printf("Received a message: %s", string(d.Body))
+               d.Ack(false)
+       }
+       err = consumeClient.Cancel(consumerTag2)
+       if err != nil {
+               fmt.Println("Failed to Cancel ConsumerWithContext, err: ", err)
+       }
+       err = consumeConn.Close()
+       if err != nil {
+               fmt.Println("Failed to Close ConsumerWithContext, err: ", err)
+       }
+}
+
+// RabbitClient is used to keep track of the RabbitMQ connection
+type RabbitClient struct {
+       // The connection that is used
+       conn *amqp.Connection
+       // The channel that processes/sends Messages
+       ch *amqp.Channel
+}
+
+func NewRabbitMQClient(conn *amqp.Connection) (RabbitClient, error) {
+       ch, err := conn.Channel()
+       if err != nil {
+               return RabbitClient{}, err
+       }
+       if err := ch.Confirm(false); err != nil {
+               return RabbitClient{}, err
+       }
+
+       return RabbitClient{
+               conn: conn,
+               ch:   ch,
+       }, nil
+}
+
+func (rc RabbitClient) Close() error {
+       return rc.ch.Close()
+}
+
+func (rc RabbitClient) Cancel(consumerTag string) error {
+       return rc.ch.Cancel(consumerTag, false)
+}
+
+func (rc RabbitClient) CreateQueue(queueName string, durable, autoDelete bool) 
(amqp.Queue, error) {
+       q, err := rc.ch.QueueDeclare(queueName, durable, autoDelete, false, 
false, nil)
+       if err != nil {
+               return amqp.Queue{}, nil
+       }
+       return q, nil
+}
+
+func (rc RabbitClient) CreateExchange(exchangeName, kind string) {
+       err := rc.ch.ExchangeDeclare(exchangeName, kind, true, false, false, 
false, nil)
+       if err != nil {
+               fmt.Println("Failed to declare a exchange, err: ", err)
+       }
+}
+
+func (rc RabbitClient) CreateBinding(name, binding, exchange string) error {
+       return rc.ch.QueueBind(name, binding, exchange, false, nil)
+}
+
+func (rc RabbitClient) Send(ctx context.Context, exchange, routingKey string, 
options amqp.Publishing) error {
+       _, err := rc.ch.PublishWithDeferredConfirmWithContext(ctx,
+               exchange,
+               routingKey,
+               true,
+               false,
+               options,
+       )
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func (rc RabbitClient) Consume(queue, consumer string, autoAck bool) (<-chan 
amqp.Delivery, error) {
+       return rc.ch.Consume(queue, consumer, autoAck, false, false, false, nil)
+}
+
+func (rc RabbitClient) ConsumeWithContext(ctx context.Context, queue, consumer 
string, autoAck bool) (<-chan amqp.Delivery, error) {
+       return rc.ch.ConsumeWithContext(ctx, queue, consumer, autoAck, false, 
false, false, nil)
+}
diff --git a/test/plugins/scenarios/amqp/plugin.yml 
b/test/plugins/scenarios/amqp/plugin.yml
new file mode 100644
index 0000000..697d967
--- /dev/null
+++ b/test/plugins/scenarios/amqp/plugin.yml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+entry-service: http://${HTTP_HOST}:${HTTP_PORT}/execute
+health-checker: http://${HTTP_HOST}:${HTTP_PORT}/health
+start-script: ./bin/startup.sh
+framework: github.com/rabbitmq/amqp091-go
+export-port: 8080
+support-version:
+  - go: 1.18
+    framework:
+      - v1.9.0
+dependencies:
+  amqp-server:
+    image: rabbitmq:3.11.14-management
+    hostname: amqp-server
+    ports:
+      - "5672"
+    environment:
+      RABBITMQ_DEFAULT_USER: "admin"
+      RABBITMQ_DEFAULT_PASS: "123456"
+    healthcheck:
+      test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
\ No newline at end of file
diff --git a/tools/go-agent/instrument/plugins/register.go 
b/tools/go-agent/instrument/plugins/register.go
index 366cd9c..38336d0 100644
--- a/tools/go-agent/instrument/plugins/register.go
+++ b/tools/go-agent/instrument/plugins/register.go
@@ -19,6 +19,7 @@ package plugins
 
 import (
        traceactivation "github.com/apache/skywalking-go/plugin/trace"
+       "github.com/apache/skywalking-go/plugins/amqp"
        "github.com/apache/skywalking-go/plugins/core/instrument"
        "github.com/apache/skywalking-go/plugins/dubbo"
        "github.com/apache/skywalking-go/plugins/echov4"
@@ -62,6 +63,7 @@ func init() {
        registerFramework(traceactivation.NewInstrument())
        registerFramework(fiber.NewInstrument())
        registerFramework(rocketmq.NewInstrument())
+       registerFramework(amqp.NewInstrument())
 
        // fasthttp related instruments
        registerFramework(fasthttp_client.NewInstrument())
diff --git a/tools/go-agent/tools/dst.go b/tools/go-agent/tools/dst.go
index 0909895..5d65554 100644
--- a/tools/go-agent/tools/dst.go
+++ b/tools/go-agent/tools/dst.go
@@ -292,6 +292,8 @@ func (i *ImportAnalyzer) analyzeFieldImport(filePath 
string, exp dst.Expr) {
                i.analyzeFieldImport(filePath, n.Elt)
        case *dst.StarExpr:
                i.analyzeFieldImport(filePath, n.X)
+       case *dst.ChanType:
+               i.analyzeFieldImport(filePath, n.Value)
        }
 }
 
diff --git a/tools/go-agent/tools/enhancement.go 
b/tools/go-agent/tools/enhancement.go
index 04a3052..2c0a023 100644
--- a/tools/go-agent/tools/enhancement.go
+++ b/tools/go-agent/tools/enhancement.go
@@ -19,6 +19,7 @@ package tools
 
 import (
        "fmt"
+       "go/token"
 
        "github.com/dave/dst"
        "github.com/dave/dst/decorator"
@@ -187,6 +188,16 @@ func GenerateTypeNameByExp(exp dst.Expr) string {
                        }
                        data += ")"
                }
+       case *dst.ChanType:
+               if n.Dir == dst.SEND {
+                       data += token.CHAN.String() + token.ARROW.String()
+               } else if n.Dir == dst.RECV {
+                       data += token.ARROW.String() + token.CHAN.String()
+               } else {
+                       data += token.CHAN.String()
+               }
+               data += " " + GenerateTypeNameByExp(n.Value)
+               return data
        default:
                return ""
        }
@@ -221,6 +232,9 @@ func addPackagePrefixForArgsAndClone(pkg string, tp 
dst.Expr) dst.Expr {
                        exp.X = dst.NewIdent(OtherPackageRefPrefix + pkg)
                }
                return exp
+       case *dst.ChanType:
+               expr := dst.Clone(tp).(*dst.ChanType)
+               return expr
        default:
                return dst.Clone(tp).(dst.Expr)
        }

Reply via email to