This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-go.git
The following commit(s) were added to refs/heads/main by this push: new 8065636 feature: add support for rocketMQ (#169) 8065636 is described below commit 8065636ea1418cbb87242fa6154fbb484f613aa1 Author: Starry <codeprince2...@163.com> AuthorDate: Fri Feb 16 16:58:44 2024 +0800 feature: add support for rocketMQ (#169) --- .github/workflows/plugin-tests.yaml | 1 + CHANGES.md | 1 + docs/en/agent/support-plugins.md | 2 + go.work | 2 + plugins/core/tracing/span.go | 2 + plugins/rocketmq/consumer/consumer.go | 92 +++++++++ plugins/rocketmq/consumer/consumer_status.go | 41 ++++ plugins/rocketmq/consumer/structures.go | 39 ++++ plugins/rocketmq/go.mod | 27 +++ plugins/rocketmq/go.sum | 154 ++++++++++++++ plugins/rocketmq/instrument.go | 99 +++++++++ plugins/rocketmq/producer/async_producer.go | 112 +++++++++++ plugins/rocketmq/producer/general_producer.go | 58 ++++++ plugins/rocketmq/producer/send_one_way.go | 42 ++++ plugins/rocketmq/producer/send_status.go | 41 ++++ plugins/rocketmq/producer/structures.go | 34 ++++ plugins/rocketmq/producer/sync_producer.go | 59 ++++++ test/plugins/runner-helper/context.go | 2 + .../runner-helper/templates/docker-compose.tpl | 12 ++ test/plugins/scenarios/rocketmq/bin/startup.sh | 22 ++ test/plugins/scenarios/rocketmq/config/broker.conf | 26 +++ .../plugins/scenarios/rocketmq/config/excepted.yml | 184 +++++++++++++++++ test/plugins/scenarios/rocketmq/go.mod | 27 +++ test/plugins/scenarios/rocketmq/go.sum | 154 ++++++++++++++ test/plugins/scenarios/rocketmq/main.go | 222 +++++++++++++++++++++ test/plugins/scenarios/rocketmq/plugin.yml | 46 +++++ tools/go-agent/instrument/plugins/register.go | 2 + 27 files changed, 1503 insertions(+) diff --git a/.github/workflows/plugin-tests.yaml b/.github/workflows/plugin-tests.yaml index 9bc0d59..de6c719 100644 --- a/.github/workflows/plugin-tests.yaml +++ b/.github/workflows/plugin-tests.yaml @@ -96,6 +96,7 @@ jobs: - discard-reporter - fiber - echov4 + - rocketmq steps: - uses: actions/checkout@v2 with: diff --git a/CHANGES.md b/CHANGES.md index 9cf0395..53b43cd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,7 @@ Release Notes. * Support setting a discard type of reporter. * Add `redis.max_args_bytes` parameter for redis plugin. * Changing intercept point for gin, make sure interfaces could be grouped when params defined in relativePath. +* Support [RocketMQ](https://github.com/apache/rocketmq-client-go) MQ. #### Documentation diff --git a/docs/en/agent/support-plugins.md b/docs/en/agent/support-plugins.md index d534428..c6c3832 100644 --- a/docs/en/agent/support-plugins.md +++ b/docs/en/agent/support-plugins.md @@ -27,6 +27,8 @@ metrics based on the tracing data. * [MySQL Driver](https://github.com/go-sql-driver/mysql) tested v1.4.0 to v1.7.1. * Cache Client * `go-redisv9`: [go-redis](https://github.com/redis/go-redis) tested v9.0.3 to v9.0.5. +* MQ Client + * `rocketMQ`: [rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested v2.1.2. # Metrics Plugins The meter plugin provides the advanced metrics collections. diff --git a/go.work b/go.work index d2cba0a..e19cdc4 100644 --- a/go.work +++ b/go.work @@ -23,6 +23,7 @@ use ( ./plugins/fasthttp ./plugins/fiber ./plugins/echov4 + ./plugins/rocketmq ./test/benchmark-codebase/consumer ./test/benchmark-codebase/provider @@ -52,6 +53,7 @@ use ( ./test/plugins/scenarios/plugin_exclusion ./test/plugins/scenarios/runtime_metrics ./test/plugins/scenarios/echov4 + ./test/plugins/scenarios/rocketmq ./tools/go-agent diff --git a/plugins/core/tracing/span.go b/plugins/core/tracing/span.go index 1cc0a86..edc8005 100644 --- a/plugins/core/tracing/span.go +++ b/plugins/core/tracing/span.go @@ -60,6 +60,8 @@ const ( TagMQQueue = "mq.queue" TagMQBroker = "mq.broker" TagMQTopic = "mq.topic" + TagMQStatus = "mq.status" + TagMQMsgID = "mq.msg.id" TagCacheType = "cache.type" TagCacheOp = "cache.op" TagCacheCmd = "cache.cmd" diff --git a/plugins/rocketmq/consumer/consumer.go b/plugins/rocketmq/consumer/consumer.go new file mode 100644 index 0000000..18cb80c --- /dev/null +++ b/plugins/rocketmq/consumer/consumer.go @@ -0,0 +1,92 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package consumer + +import ( + "strings" + + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + rmqConsumerComponentID = 39 + rmqConsumerPrefix = "RocketMQ/" + rmqConsumerSuffix = "/Consumer" + tagMQMsgID = "mq.msg.id" + tagMQOffsetMsgID = "mq.offset.msg.id" + semicolon = ";" +) + +type SwConsumerInterceptor struct { +} + +func (c *SwConsumerInterceptor) BeforeInvoke(invocation operator.Invocation) error { + pushConsumer := invocation.CallerInstance().(*nativepushConsumer) + peer := strings.Join(pushConsumer.client.GetNameSrv().AddrList(), semicolon) + subMsgs := invocation.Args()[1].([]*primitive.MessageExt) + if len(subMsgs) == 0 { + return nil + } + topic, addr := subMsgs[0].Topic, subMsgs[0].StoreHost + operationName := rmqConsumerPrefix + topic + rmqConsumerSuffix + + var ( + span tracing.Span + err error + ) + for _, msg := range subMsgs { + span, err = tracing.CreateEntrySpan(operationName, func(headerKey string) (string, error) { + return msg.GetProperty(headerKey), nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(rmqConsumerComponentID), + tracing.WithTag(tracing.TagMQTopic, topic), + tracing.WithTag(tagMQMsgID, msg.MsgId), + tracing.WithTag(tagMQOffsetMsgID, msg.OffsetMsgId), + ) + if err != nil { + return err + } + } + span.Tag(tracing.TagMQBroker, addr) + span.SetPeer(peer) + invocation.SetContext(span) + return nil +} + +func (c *SwConsumerInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + if invocation.GetContext() == nil { + return nil + } + span := invocation.GetContext().(tracing.Span) + if err, ok := result[1].(error); ok && err != nil { + span.Error(err.Error()) + } + if consumeRet, ok := result[0].(consumer.ConsumeResult); ok { + span.Tag(tracing.TagMQStatus, SwConsumerStatusStr(consumeRet)) + if consumer.ConsumeSuccess != consumeRet { + span.Error() + } + } + span.End() + return nil +} diff --git a/plugins/rocketmq/consumer/consumer_status.go b/plugins/rocketmq/consumer/consumer_status.go new file mode 100644 index 0000000..fbe1d74 --- /dev/null +++ b/plugins/rocketmq/consumer/consumer_status.go @@ -0,0 +1,41 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package consumer + +import ( + "fmt" + + "github.com/apache/rocketmq-client-go/v2/consumer" +) + +func SwConsumerStatusStr(status consumer.ConsumeResult) string { + switch status { + case consumer.ConsumeSuccess: + return "ConsumeSuccess" + case consumer.ConsumeRetryLater: + return "ConsumeRetryLater" + case consumer.Commit: + return "Commit" + case consumer.Rollback: + return "Rollback" + case consumer.SuspendCurrentQueueAMoment: + return "SuspendCurrentQueueAMoment" + default: + return fmt.Sprintf("%d", status) + } +} diff --git a/plugins/rocketmq/consumer/structures.go b/plugins/rocketmq/consumer/structures.go new file mode 100644 index 0000000..2650b4e --- /dev/null +++ b/plugins/rocketmq/consumer/structures.go @@ -0,0 +1,39 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package consumer + +//skywalking:native github.com/apache/rocketmq-client-go/v2/consumer pushConsumer +type nativepushConsumer struct { + *nativedefaultConsumer +} + +//skywalking:native github.com/apache/rocketmq-client-go/v2/consumer defaultConsumer +type nativedefaultConsumer struct { + client nativeRMQClient +} + +//skywalking:native github.com/apache/rocketmq-client-go/v2/internal RMQClient +type nativeRMQClient interface { + GetNameSrv() nativeNamesrvs +} + +//skywalking:native github.com/apache/rocketmq-client-go/v2/internal Namesrvs +type nativeNamesrvs interface { + FindBrokerAddrByName(brokerName string) string + AddrList() []string +} diff --git a/plugins/rocketmq/go.mod b/plugins/rocketmq/go.mod new file mode 100644 index 0000000..bd84e61 --- /dev/null +++ b/plugins/rocketmq/go.mod @@ -0,0 +1,27 @@ +module github.com/apache/skywalking-go/plugins/rocketmq + +go 1.18 + +require ( + github.com/apache/rocketmq-client-go/v2 v2.1.2 // indirect + github.com/emirpasic/gods v1.12.0 // indirect + github.com/golang/mock v1.3.1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + github.com/pkg/errors v0.8.1 // indirect + github.com/sirupsen/logrus v1.4.0 // indirect + github.com/tidwall/gjson v1.13.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + go.uber.org/atomic v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + stathat.com/c/consistent v1.0.0 // indirect +) \ No newline at end of file diff --git a/plugins/rocketmq/go.sum b/plugins/rocketmq/go.sum new file mode 100644 index 0000000..f0e0e0d --- /dev/null +++ b/plugins/rocketmq/go.sum @@ -0,0 +1,154 @@ +github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/apache/rocketmq-client-go/v2 v2.1.2 h1:yt73olKe5N6894Dbm+ojRf/JPiP0cxfDNNffKwhpJVg= +github.com/apache/rocketmq-client-go/v2 v2.1.2/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= +github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM= +go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/plugins/rocketmq/instrument.go b/plugins/rocketmq/instrument.go new file mode 100644 index 0000000..363998b --- /dev/null +++ b/plugins/rocketmq/instrument.go @@ -0,0 +1,99 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package rocketmq + +import ( + "embed" + + "github.com/apache/skywalking-go/plugins/core/instrument" +) + +//go:embed * +var fs embed.FS + +//skywalking:nocopy +type Instrument struct { +} + +func NewInstrument() *Instrument { + return &Instrument{} +} + +func (i *Instrument) Name() string { + return "rocketmq" +} + +func (i *Instrument) BasePackage() string { + return "github.com/apache/rocketmq-client-go/v2" +} + +func (i *Instrument) VersionChecker(version string) bool { + return true +} + +func (i *Instrument) Points() []*instrument.Point { + return []*instrument.Point{ + { + PackagePath: "producer", + At: instrument.NewMethodEnhance("*defaultProducer", "SendSync", + instrument.WithArgsCount(2), + instrument.WithArgType(0, "context.Context"), + instrument.WithArgType(1, "...*primitive.Message"), + instrument.WithResultCount(2), + instrument.WithResultType(0, "*primitive.SendResult"), + instrument.WithResultType(1, "error")), + Interceptor: "SendSyncInterceptor", + }, + { + PackagePath: "producer", + At: instrument.NewMethodEnhance("*defaultProducer", "SendAsync", + instrument.WithArgsCount(3), + instrument.WithArgType(0, "context.Context"), + instrument.WithArgType(1, "func(context.Context, *primitive.SendResult, error)"), + instrument.WithArgType(2, "...*primitive.Message"), + instrument.WithResultCount(1), + instrument.WithResultType(0, "error")), + Interceptor: "SendASyncInterceptor", + }, + { + PackagePath: "producer", + At: instrument.NewMethodEnhance("*defaultProducer", "SendOneWay", + instrument.WithArgsCount(2), + instrument.WithArgType(0, "context.Context"), + instrument.WithArgType(1, "msgs ...*primitive.Message"), + instrument.WithResultCount(1), + instrument.WithResultType(0, "error")), + Interceptor: "SendOneWayInterceptor", + }, + { + PackagePath: "consumer", + At: instrument.NewMethodEnhance("*pushConsumer", "consumeInner", + instrument.WithArgsCount(2), + instrument.WithArgType(0, "context.Context"), + instrument.WithArgType(1, "[]*primitive.MessageExt"), + instrument.WithResultCount(2), + instrument.WithResultType(0, "ConsumeResult"), + instrument.WithResultType(1, "error")), + Interceptor: "SwConsumerInterceptor", + }, + } +} + +func (i *Instrument) FS() *embed.FS { + return &fs +} diff --git a/plugins/rocketmq/producer/async_producer.go b/plugins/rocketmq/producer/async_producer.go new file mode 100644 index 0000000..1ef0237 --- /dev/null +++ b/plugins/rocketmq/producer/async_producer.go @@ -0,0 +1,112 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package producer + +import ( + "context" + "fmt" + "strings" + + "github.com/apache/rocketmq-client-go/v2/primitive" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + rmqASyncSendPrefix = "RocketMQ/" + rmqASyncSuffix = "/AsyncProducer" + rmqCallbackSuffix = "/Producer/Callback" + rmqASyncComponentID = 38 + aSyncSemicolon = ";" + aSyncTagMQOffsetMsgID = "mq.offset.msg.id" +) + +type SendASyncInterceptor struct { +} + +func (sa *SendASyncInterceptor) BeforeInvoke(invocation operator.Invocation) error { + defaultProducer := invocation.CallerInstance().(*nativedefaultProducer) + peer := strings.Join(defaultProducer.client.GetNameSrv().AddrList(), aSyncSemicolon) + msgList := invocation.Args()[2].([]*primitive.Message) + topic := msgList[0].Topic + operationName := rmqASyncSendPrefix + topic + rmqASyncSuffix + + span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error { + for _, message := range msgList { + message.WithProperty(headerKey, headerValue) + } + return nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(rmqASyncComponentID), + tracing.WithTag(tracing.TagMQTopic, topic), + ) + if err != nil { + return err + } + + continueSnapShot := tracing.CaptureContext() + zuper := invocation.Args()[1].(func(ctx context.Context, result *primitive.SendResult, err error)) + // enhance async callback method + callbackFunc := func(ctx context.Context, sendResult *primitive.SendResult, err error) { + defer tracing.CleanContext() + tracing.ContinueContext(continueSnapShot) + operationName = rmqASyncSendPrefix + topic + rmqCallbackSuffix + + localSpan, localErr := tracing.CreateLocalSpan(operationName, + tracing.WithComponent(rmqASyncComponentID), + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithTag(tracing.TagMQTopic, topic), + ) + if localErr != nil { + zuper(ctx, sendResult, err) + return + } + if err != nil { + span.Error(err.Error()) + } + localSpan.Tag(tracing.TagMQStatus, SendStatusStr(sendResult.Status)) + localSpan.Tag(tracing.TagMQQueue, fmt.Sprintf("%d", sendResult.MessageQueue.QueueId)) + localSpan.Tag(tracing.TagMQBroker, defaultProducer.client.GetNameSrv(). + FindBrokerAddrByName(sendResult.MessageQueue.BrokerName)) + localSpan.Tag(tracing.TagMQMsgID, sendResult.MsgID) + localSpan.Tag(aSyncTagMQOffsetMsgID, sendResult.OffsetMsgID) + + zuper(ctx, sendResult, err) + localSpan.SetPeer(peer) + localSpan.End() + } + + span.SetPeer(peer) + invocation.ChangeArg(1, callbackFunc) + invocation.SetContext(span) + return nil +} + +func (sa *SendASyncInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + if invocation.GetContext() == nil { + return nil + } + span := invocation.GetContext().(tracing.Span) + if err, ok := result[0].(error); ok && err != nil { + span.Error(err.Error()) + } + span.End() + return nil +} diff --git a/plugins/rocketmq/producer/general_producer.go b/plugins/rocketmq/producer/general_producer.go new file mode 100644 index 0000000..b0d490a --- /dev/null +++ b/plugins/rocketmq/producer/general_producer.go @@ -0,0 +1,58 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package producer + +import ( + "strings" + + "github.com/apache/rocketmq-client-go/v2/primitive" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + rmqSyncSendPrefix = "RocketMQ/" + rmqProducerSuffix = "/Producer" + semicolon = ";" + rmqSyncComponentID = 38 +) + +func GeneralProducerBeforeInvoke(invocation operator.Invocation) error { + defaultProducer := invocation.CallerInstance().(*nativedefaultProducer) + peer := strings.Join(defaultProducer.client.GetNameSrv().AddrList(), semicolon) + msgList := invocation.Args()[1].([]*primitive.Message) + topic := msgList[0].Topic + operationName := rmqSyncSendPrefix + topic + rmqProducerSuffix + + span, err := tracing.CreateExitSpan(operationName, peer, func(headerKey, headerValue string) error { + for _, message := range msgList { + message.WithProperty(headerKey, headerValue) + } + return nil + }, + tracing.WithLayer(tracing.SpanLayerMQ), + tracing.WithComponent(rmqSyncComponentID), + tracing.WithTag(tracing.TagMQTopic, topic), + ) + if err != nil { + return err + } + invocation.SetContext(span) + return nil +} diff --git a/plugins/rocketmq/producer/send_one_way.go b/plugins/rocketmq/producer/send_one_way.go new file mode 100644 index 0000000..fb9a294 --- /dev/null +++ b/plugins/rocketmq/producer/send_one_way.go @@ -0,0 +1,42 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package producer + +import ( + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +type SendOneWayInterceptor struct { +} + +func (s *SendOneWayInterceptor) BeforeInvoke(invocation operator.Invocation) error { + return GeneralProducerBeforeInvoke(invocation) +} + +func (s *SendOneWayInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + if invocation.GetContext() == nil { + return nil + } + span := invocation.GetContext().(tracing.Span) + if err, ok := result[0].(error); ok && err != nil { + span.Error(err.Error()) + } + span.End() + return nil +} diff --git a/plugins/rocketmq/producer/send_status.go b/plugins/rocketmq/producer/send_status.go new file mode 100644 index 0000000..ca120d7 --- /dev/null +++ b/plugins/rocketmq/producer/send_status.go @@ -0,0 +1,41 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package producer + +import ( + "fmt" + + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +func SendStatusStr(status primitive.SendStatus) string { + switch status { + case primitive.SendOK: + return "SendOK" + case primitive.SendFlushDiskTimeout: + return "SendFlushDiskTimeout" + case primitive.SendFlushSlaveTimeout: + return "SendFlushSlaveTimeout" + case primitive.SendSlaveNotAvailable: + return "SendSlaveNotAvailable" + case primitive.SendUnknownError: + return "SendUnknownError" + default: + return fmt.Sprintf("%d", status) + } +} diff --git a/plugins/rocketmq/producer/structures.go b/plugins/rocketmq/producer/structures.go new file mode 100644 index 0000000..76f7e7f --- /dev/null +++ b/plugins/rocketmq/producer/structures.go @@ -0,0 +1,34 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package producer + +//skywalking:native github.com/apache/rocketmq-client-go/v2/producer defaultProducer +type nativedefaultProducer struct { + client nativeRMQClient +} + +//skywalking:native github.com/apache/rocketmq-client-go/v2/internal RMQClient +type nativeRMQClient interface { + GetNameSrv() nativeNamesrvs +} + +//skywalking:native github.com/apache/rocketmq-client-go/v2/internal Namesrvs +type nativeNamesrvs interface { + FindBrokerAddrByName(brokerName string) string + AddrList() []string +} diff --git a/plugins/rocketmq/producer/sync_producer.go b/plugins/rocketmq/producer/sync_producer.go new file mode 100644 index 0000000..474451a --- /dev/null +++ b/plugins/rocketmq/producer/sync_producer.go @@ -0,0 +1,59 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package producer + +import ( + "fmt" + + "github.com/apache/rocketmq-client-go/v2/primitive" + + "github.com/apache/skywalking-go/plugins/core/operator" + "github.com/apache/skywalking-go/plugins/core/tracing" +) + +const ( + syncTagMQOffsetMsgID = "mq.offset.msg.id" +) + +type SendSyncInterceptor struct { +} + +func (s *SendSyncInterceptor) BeforeInvoke(invocation operator.Invocation) error { + return GeneralProducerBeforeInvoke(invocation) +} + +func (s *SendSyncInterceptor) AfterInvoke(invocation operator.Invocation, result ...interface{}) error { + if invocation.GetContext() == nil { + return nil + } + defaultProducer := invocation.CallerInstance().(*nativedefaultProducer) + span := invocation.GetContext().(tracing.Span) + if err, ok := result[1].(error); ok && err != nil { + span.Error(err.Error()) + } + if sendRet, ok := result[0].(*primitive.SendResult); ok && sendRet != nil { + span.Tag(tracing.TagMQStatus, SendStatusStr(sendRet.Status)) + span.Tag(tracing.TagMQQueue, fmt.Sprintf("%d", sendRet.MessageQueue.QueueId)) + span.Tag(tracing.TagMQBroker, defaultProducer.client.GetNameSrv(). + FindBrokerAddrByName(sendRet.MessageQueue.BrokerName)) + span.Tag(tracing.TagMQMsgID, sendRet.MsgID) + span.Tag(syncTagMQOffsetMsgID, sendRet.OffsetMsgID) + } + span.End() + return nil +} diff --git a/test/plugins/runner-helper/context.go b/test/plugins/runner-helper/context.go index 78b6f7d..3c5a2ab 100644 --- a/test/plugins/runner-helper/context.go +++ b/test/plugins/runner-helper/context.go @@ -93,6 +93,8 @@ type DockerDependencyService struct { Exports []string `yaml:"expose"` Env map[string]string `yaml:"environment"` Command []string `yaml:"command"` + Volumes []string `yaml:"volumes"` + DependsOn []string `yaml:"depends_on"` HealthCheck *ServiceHealthCheck `yaml:"healthcheck"` } diff --git a/test/plugins/runner-helper/templates/docker-compose.tpl b/test/plugins/runner-helper/templates/docker-compose.tpl index c546f6b..43ccc9b 100644 --- a/test/plugins/runner-helper/templates/docker-compose.tpl +++ b/test/plugins/runner-helper/templates/docker-compose.tpl @@ -100,6 +100,18 @@ services: - "{{.}}" {{- end }} {{- end }} + {{- if $service.Volumes }} + volumes: + {{- range $service.Volumes }} + - "{{.}}" + {{- end }} + {{- end }} + {{- if $service.DependsOn }} + depends_on: + {{- range $service.DependsOn }} + - "{{.}}" + {{- end }} + {{- end }} {{- if $service.HealthCheck }} healthcheck: test: diff --git a/test/plugins/scenarios/rocketmq/bin/startup.sh b/test/plugins/scenarios/rocketmq/bin/startup.sh new file mode 100755 index 0000000..0dc5924 --- /dev/null +++ b/test/plugins/scenarios/rocketmq/bin/startup.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +home="$(cd "$(dirname $0)"; pwd)" +go build ${GO_BUILD_OPTS} -o rocketmq + +./rocketmq \ No newline at end of file diff --git a/test/plugins/scenarios/rocketmq/config/broker.conf b/test/plugins/scenarios/rocketmq/config/broker.conf new file mode 100644 index 0000000..5824da8 --- /dev/null +++ b/test/plugins/scenarios/rocketmq/config/broker.conf @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +brokerClusterName=sw-cluster +brokerName=sw-broker +brokerId=0 +deleteWhen=04 +fileReservedTime=48 +brokerRole=ASYNC_MASTER +flushDiskType=ASYNC_FLUSH +autoCreateTopicEnable=true +brokerIP1=mqbroker +namesrvAddr=mqnamesrv:9876 \ No newline at end of file diff --git a/test/plugins/scenarios/rocketmq/config/excepted.yml b/test/plugins/scenarios/rocketmq/config/excepted.yml new file mode 100644 index 0000000..98ba820 --- /dev/null +++ b/test/plugins/scenarios/rocketmq/config/excepted.yml @@ -0,0 +1,184 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +segmentItems: + - serviceName: rocketmq + segmentSize: 2 + segments: + - segmentId: not null + spans: + - operationName: GET:/health + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 5004 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - { key: http.method, value: GET } + - { key: url, value: 'service:8080/health' } + - { key: status_code, value: '200' } + - segmentId: not null + spans: + - operationName: RocketMQ/sw-topic/Producer + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 38 + isError: false + spanType: Exit + peer: http://mqnamesrv:9876 + skipAnalysis: false + tags: + - { key: mq.topic, value: not null } + - { key: mq.status, value: not null } + - { key: mq.queue, value: not null } + - { key: mq.broker, value: not null } + - { key: mq.msg.id, value: not null } + - { key: mq.offset.msg.id, value: not null } + - operationName: RocketMQ/sw-topic/Consumer + parentSpanId: 0 + spanId: 2 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 39 + isError: false + spanType: Entry + peer: http://mqnamesrv:9876 + skipAnalysis: false + tags: + - { key: mq.topic, value: not null } + - { key: mq.msg.id, value: not null } + - { key: mq.offset.msg.id, value: not null } + - { key: mq.broker, value: not null } + - { key: mq.status, value: not null } + refs: + - { parentEndpoint: 'GET:/execute', networkAddress: 'http://mqnamesrv:9876', + refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: rocketmq, + traceId: not null } + - operationName: RocketMQ/sw-topic/AsyncProducer # When calling, first create an Exit span. + parentSpanId: 0 + spanId: 3 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 38 + isError: false + spanType: Exit + peer: http://mqnamesrv:9876 + skipAnalysis: false + tags: + - { key: mq.topic, value: sw-topic } + - operationName: RocketMQ/sw-topic/Producer/Callback # The callback task creates a Local span. + parentSpanId: 3 + spanId: 4 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 38 + isError: false + spanType: Local + peer: http://mqnamesrv:9876 + skipAnalysis: false + tags: + - { key: mq.topic, value: not null } + - { key: mq.status, value: not null } + - { key: mq.queue, value: not null } + - { key: mq.broker, value: not null } + - { key: mq.msg.id, value: not null } + - { key: mq.offset.msg.id, value: not null } + - operationName: RocketMQ/sw-topic/Consumer + parentSpanId: 0 + spanId: 5 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 39 + isError: false + spanType: Entry + peer: http://mqnamesrv:9876 + skipAnalysis: false + tags: + - { key: mq.topic, value: not null } + - { key: mq.msg.id, value: not null } + - { key: mq.offset.msg.id, value: not null } + - { key: mq.broker, value: not null } + - { key: mq.status, value: not null } + refs: + - { parentEndpoint: 'GET:/execute', networkAddress: 'http://mqnamesrv:9876', + refType: CrossProcess, parentSpanId: 3, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: rocketmq, + traceId: not null } + - operationName: RocketMQ/sw-topic/Producer + parentSpanId: 0 + spanId: 6 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 38 + isError: false + spanType: Exit + peer: http://mqnamesrv:9876 + skipAnalysis: false + tags: + - { key: mq.topic, value: sw-topic } + - operationName: RocketMQ/sw-topic/Consumer + parentSpanId: 0 + spanId: 7 + spanLayer: MQ + startTime: nq 0 + endTime: nq 0 + componentId: 39 + isError: false + spanType: Entry + peer: http://mqnamesrv:9876 + skipAnalysis: false + tags: + - { key: mq.topic, value: not null } + - { key: mq.msg.id, value: not null } + - { key: mq.offset.msg.id, value: not null } + - { key: mq.broker, value: not null } + - { key: mq.status, value: not null } + refs: + - { parentEndpoint: 'GET:/execute', networkAddress: 'http://mqnamesrv:9876', + refType: CrossProcess, parentSpanId: 6, parentTraceSegmentId: not null, + parentServiceInstance: not null, parentService: rocketmq, + traceId: not null } + - operationName: GET:/execute + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: nq 0 + endTime: nq 0 + componentId: 5004 + isError: false + spanType: Entry + peer: '' + skipAnalysis: false + tags: + - { key: http.method, value: GET } + - { key: url, value: 'service:8080/execute' } + - { key: status_code, value: '200' } +meterItems: [ ] +logItems: [ ] \ No newline at end of file diff --git a/test/plugins/scenarios/rocketmq/go.mod b/test/plugins/scenarios/rocketmq/go.mod new file mode 100644 index 0000000..e9e706d --- /dev/null +++ b/test/plugins/scenarios/rocketmq/go.mod @@ -0,0 +1,27 @@ +module test/plugins/scenarios/rocketmq + +go 1.18 + +require ( + github.com/apache/rocketmq-client-go/v2 v2.1.2 // indirect + github.com/emirpasic/gods v1.12.0 // indirect + github.com/golang/mock v1.3.1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + github.com/pkg/errors v0.8.1 // indirect + github.com/sirupsen/logrus v1.4.0 // indirect + github.com/tidwall/gjson v1.13.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + go.uber.org/atomic v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect + golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + stathat.com/c/consistent v1.0.0 // indirect +) \ No newline at end of file diff --git a/test/plugins/scenarios/rocketmq/go.sum b/test/plugins/scenarios/rocketmq/go.sum new file mode 100644 index 0000000..a3bc6f9 --- /dev/null +++ b/test/plugins/scenarios/rocketmq/go.sum @@ -0,0 +1,154 @@ +github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/apache/rocketmq-client-go/v2 v2.1.2 h1:yt73olKe5N6894Dbm+ojRf/JPiP0cxfDNNffKwhpJVg= +github.com/apache/rocketmq-client-go/v2 v2.1.2/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= +github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M= +github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM= +go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= \ No newline at end of file diff --git a/test/plugins/scenarios/rocketmq/main.go b/test/plugins/scenarios/rocketmq/main.go new file mode 100644 index 0000000..6f175cf --- /dev/null +++ b/test/plugins/scenarios/rocketmq/main.go @@ -0,0 +1,222 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "fmt" + "net/http" + "strconv" + "sync" + "time" + + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" + + _ "github.com/apache/skywalking-go" +) + +type testFunc func() error + +const ( + uri = "http://mqnamesrv:9876" + retry = 2 + topic = "sw-topic" + group = "sw-group" + msg = "I love skywalking %s thousand" +) + +func main() { + route := http.NewServeMux() + route.HandleFunc("/execute", func(res http.ResponseWriter, req *http.Request) { + TestProCon() + _, _ = res.Write([]byte("execute success")) + }) + route.HandleFunc("/health", func(writer http.ResponseWriter, request *http.Request) { + _, _ = writer.Write([]byte("ok")) + }) + fmt.Println("start client") + err := http.ListenAndServe(":8080", route) + if err != nil { + fmt.Printf("client start error: %v \n", err) + } +} + +func TestProCon() { + tests := []struct { + name string + fn testFunc + }{ + {"sendSyncMsg", sendSyncMsg}, + {"sendAsyncMsg", sendAsyncMsg}, + {"sendOneWayMsg", sendOneWayMsg}, + } + for _, test := range tests { + fmt.Printf("excute test case: %s\n", test.name) + if subErr := test.fn(); subErr != nil { + fmt.Printf("test case %s failed: %v", test.name, subErr) + } + } +} + +func sendSyncMsg() error { + p, err := rocketmq.NewProducer( + producer.WithNsResolver(primitive.NewPassthroughResolver([]string{uri})), + producer.WithRetry(retry), + producer.WithGroupName(group), + ) + if err != nil { + fmt.Printf("new producer error: %s\n", err.Error()) + return err + } + err = p.Start() + if err != nil { + fmt.Printf("start producer error: %s\n", err.Error()) + return err + } + var msgs []*primitive.Message + for i := 1; i < 2; i++ { + msgs = append(msgs, primitive.NewMessage( + topic, + []byte(fmt.Sprintf(msg, strconv.Itoa(i)))), + ) + } + + res, err := p.SendSync(context.Background(), msgs...) + if err != nil { + fmt.Printf("send message error: %s\n", err) + return err + } else { + fmt.Printf("send message success: result=%s\n", res.String()) + } + err = p.Shutdown() + if err != nil { + fmt.Printf("shutdown producer error: %s\n", err.Error()) + return err + } + consumerMsg() + return nil +} + +func sendAsyncMsg() error { + p, err := rocketmq.NewProducer( + producer.WithNsResolver(primitive.NewPassthroughResolver([]string{uri})), + producer.WithRetry(retry), + producer.WithGroupName(group), + ) + + if err != nil { + fmt.Printf("new producer error: %s\n", err.Error()) + return err + } + err = p.Start() + if err != nil { + fmt.Printf("start producer error: %s\n", err.Error()) + return err + } + var wg sync.WaitGroup + for i := 1; i < 2; i++ { + wg.Add(1) + err = p.SendAsync(context.Background(), + func(ctx context.Context, result *primitive.SendResult, e error) { + if e != nil { + fmt.Printf("receive message error: %s\n", err) + } else { + fmt.Printf("send message success: result=%s\n", result.String()) + } + wg.Done() + }, primitive.NewMessage(topic, []byte(fmt.Sprintf(msg, strconv.Itoa(i))))) + + if err != nil { + fmt.Printf("send message error: %s\n", err) + return err + } + } + wg.Wait() + err = p.Shutdown() + if err != nil { + fmt.Printf("shutdown producer error: %s\n", err.Error()) + return err + } + consumerMsg() + return nil +} + +func sendOneWayMsg() error { + p, err := rocketmq.NewProducer( + producer.WithNsResolver(primitive.NewPassthroughResolver([]string{uri})), + producer.WithRetry(retry), + producer.WithGroupName(group), + ) + if err != nil { + fmt.Printf("new producer error: %s\n", err.Error()) + return err + } + err = p.Start() + if err != nil { + fmt.Printf("start producer error: %s\n", err.Error()) + return err + } + var msgs []*primitive.Message + for i := 1; i < 2; i++ { + msgs = append(msgs, primitive.NewMessage( + topic, + []byte(fmt.Sprintf(msg, strconv.Itoa(i)))), + ) + } + err = p.SendOneWay(context.Background(), msgs...) + if err != nil { + fmt.Printf("send_one_way message error: %s\n", err) + return err + } + err = p.Shutdown() + if err != nil { + fmt.Printf("shutdown producer error: %s\n", err.Error()) + return err + } + consumerMsg() + return nil +} + +func consumerMsg() { + var err error + c, err := rocketmq.NewPushConsumer( + consumer.WithGroupName(group), + consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{uri})), + ) + if err != nil { + fmt.Printf("new consumer error: %s\n", err.Error()) + } + err = c.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, + msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + for i := range msgs { + fmt.Printf("subscribe callback: %v \n", msgs[i]) + } + return consumer.ConsumeSuccess, nil + }) + if err != nil { + fmt.Println(err.Error()) + } + err = c.Start() + if err != nil { + fmt.Println(err.Error()) + } + time.Sleep(time.Second) +} diff --git a/test/plugins/scenarios/rocketmq/plugin.yml b/test/plugins/scenarios/rocketmq/plugin.yml new file mode 100644 index 0000000..1980af8 --- /dev/null +++ b/test/plugins/scenarios/rocketmq/plugin.yml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +entry-service: http://${HTTP_HOST}:${HTTP_PORT}/execute +health-checker: http://${HTTP_HOST}:${HTTP_PORT}/health +start-script: ./bin/startup.sh +framework: github.com/apache/rocketmq-client-go/v2 +export-port: 8080 +support-version: + - go: 1.18 + framework: + - v2.1.2 +dependencies: + mqnamesrv: + image: apache/rocketmq:5.1.4 + hostname: mqnamesrv + ports: + - 9876 + command: ["sh", "mqnamesrv"] + mqbroker: + image: apache/rocketmq:5.1.4 + hostname: mqbroker + depends_on: + - mqnamesrv + ports: + - 10909 + - 10911 + - 10912 + volumes: + - "./config/broker.conf:/opt/rocketmq-5.1.4/conf/broker.conf" + command: ["sh", "mqbroker", "autoCreateTopicEnable=true"] + environment: + NAMESRV_ADDR: "mqnamesrv:9876" \ No newline at end of file diff --git a/tools/go-agent/instrument/plugins/register.go b/tools/go-agent/instrument/plugins/register.go index 8b6e187..366cd9c 100644 --- a/tools/go-agent/instrument/plugins/register.go +++ b/tools/go-agent/instrument/plugins/register.go @@ -37,6 +37,7 @@ import ( "github.com/apache/skywalking-go/plugins/microv4" "github.com/apache/skywalking-go/plugins/mongo" "github.com/apache/skywalking-go/plugins/mux" + "github.com/apache/skywalking-go/plugins/rocketmq" runtime_metrics "github.com/apache/skywalking-go/plugins/runtimemetrics" sql_entry "github.com/apache/skywalking-go/plugins/sql/entry" sql_mysql "github.com/apache/skywalking-go/plugins/sql/mysql" @@ -60,6 +61,7 @@ func init() { registerFramework(irisv12.NewInstrument()) registerFramework(traceactivation.NewInstrument()) registerFramework(fiber.NewInstrument()) + registerFramework(rocketmq.NewInstrument()) // fasthttp related instruments registerFramework(fasthttp_client.NewInstrument())