This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git


The following commit(s) were added to refs/heads/main by this push:
     new e31b654  feature: add support for segmentio-kafka (#176)
e31b654 is described below

commit e31b654bebf92f378c99e98df3747b02b55a983c
Author: Starry <codeprince2...@163.com>
AuthorDate: Sun Mar 17 15:42:13 2024 +0800

    feature: add support for segmentio-kafka (#176)
---
 .github/workflows/plugin-tests.yaml                |   1 +
 CHANGES.md                                         |   1 +
 docs/en/agent/support-plugins.md                   |   1 +
 go.work                                            |   2 +
 plugins/segmentio-kafka/go.mod                     |   9 ++
 plugins/segmentio-kafka/go.sum                     |  59 ++++++++
 plugins/segmentio-kafka/instrument.go              |  78 +++++++++++
 plugins/segmentio-kafka/reader_interceptor.go      |  74 +++++++++++
 plugins/segmentio-kafka/writer_interceptor.go      |  80 +++++++++++
 .../scenarios/segmentio-kafka/bin/startup.sh       |  22 +++
 .../scenarios/segmentio-kafka/config/excepted.yml  |  74 +++++++++++
 test/plugins/scenarios/segmentio-kafka/go.mod      |   9 ++
 test/plugins/scenarios/segmentio-kafka/go.sum      |  59 ++++++++
 test/plugins/scenarios/segmentio-kafka/main.go     | 148 +++++++++++++++++++++
 test/plugins/scenarios/segmentio-kafka/plugin.yml  |  40 ++++++
 tools/go-agent/instrument/plugins/register.go      |   2 +
 16 files changed, 659 insertions(+)

diff --git a/.github/workflows/plugin-tests.yaml 
b/.github/workflows/plugin-tests.yaml
index 737184d..c58c498 100644
--- a/.github/workflows/plugin-tests.yaml
+++ b/.github/workflows/plugin-tests.yaml
@@ -99,6 +99,7 @@ jobs:
           - rocketmq
           - amqp
           - pulsar
+          - segmentio-kafka
     steps:
       - uses: actions/checkout@v2
         with:
diff --git a/CHANGES.md b/CHANGES.md
index 467c25f..f823c49 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@ Release Notes.
 ------------------
 #### Plugins
 * Support [Pulsar](https://github.com/apache/pulsar-client-go) MQ.
+* Support [Segmentio-Kafka](https://github.com/segmentio/kafka-go) MQ.
 
 0.4.0
 ------------------
diff --git a/docs/en/agent/support-plugins.md b/docs/en/agent/support-plugins.md
index 92502f1..3eb8dbc 100644
--- a/docs/en/agent/support-plugins.md
+++ b/docs/en/agent/support-plugins.md
@@ -31,6 +31,7 @@ metrics based on the tracing data.
   * `rocketMQ`: 
[rocketmq-client-go](https://github.com/apache/rocketmq-client-go) tested 
v2.1.2.
   * `amqp`: [AMQP](https://github.com/rabbitmq/amqp091-go) tested v1.9.0.
   * `pulsar`: [pulsar-client-go](https://github.com/apache/pulsar-client-go) 
tested v0.12.0.
+  * `segmentio-kafka`: 
[segmentio-kafka](https://github.com/segmentio/kafka-go) tested v0.4.47.
 
 # Metrics Plugins
 The meter plugin provides the advanced metrics collections.
diff --git a/go.work b/go.work
index fd0ce8b..29fd8a3 100644
--- a/go.work
+++ b/go.work
@@ -26,6 +26,7 @@ use (
        ./plugins/rocketmq
        ./plugins/amqp
        ./plugins/pulsar
+       ./plugins/segmentio-kafka
 
        ./test/benchmark-codebase/consumer
        ./test/benchmark-codebase/provider
@@ -58,6 +59,7 @@ use (
        ./test/plugins/scenarios/rocketmq
        ./test/plugins/scenarios/amqp
        ./test/plugins/scenarios/pulsar
+       ./test/plugins/scenarios/segmentio-kafka
 
        ./tools/go-agent
 
diff --git a/plugins/segmentio-kafka/go.mod b/plugins/segmentio-kafka/go.mod
new file mode 100644
index 0000000..cd8cffb
--- /dev/null
+++ b/plugins/segmentio-kafka/go.mod
@@ -0,0 +1,9 @@
+module github.com/apache/skywalking-go/plugins/segmentio-kafka
+
+go 1.18
+
+require (
+       github.com/klauspost/compress v1.15.9 // indirect
+       github.com/pierrec/lz4/v4 v4.1.15 // indirect
+       github.com/segmentio/kafka-go v0.4.47 // indirect
+)
diff --git a/plugins/segmentio-kafka/go.sum b/plugins/segmentio-kafka/go.sum
new file mode 100644
index 0000000..cf392a8
--- /dev/null
+++ b/plugins/segmentio-kafka/go.sum
@@ -0,0 +1,59 @@
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/klauspost/compress v1.15.9 
h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
+github.com/klauspost/compress v1.15.9/go.mod 
h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/pierrec/lz4/v4 v4.1.15 
h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod 
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/segmentio/kafka-go v0.4.47 
h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod 
h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod 
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod 
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod 
h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2/go.mod 
h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4/go.mod 
h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+github.com/yuin/goldmark v1.4.13/go.mod 
h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod 
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.14.0/go.mod 
h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod 
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod 
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod 
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.13.0/go.mod 
h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod 
h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod 
h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod 
h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/plugins/segmentio-kafka/instrument.go 
b/plugins/segmentio-kafka/instrument.go
new file mode 100644
index 0000000..a7ff008
--- /dev/null
+++ b/plugins/segmentio-kafka/instrument.go
@@ -0,0 +1,78 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segmentiokafka
+
+import (
+       "embed"
+
+       "github.com/apache/skywalking-go/plugins/core/instrument"
+)
+
+//go:embed *
+var fs embed.FS
+
+//skywalking:nocopy
+type Instrument struct {
+}
+
+func NewInstrument() *Instrument {
+       return &Instrument{}
+}
+
+func (i *Instrument) Name() string {
+       return "segmentio_kafka"
+}
+
+func (i *Instrument) BasePackage() string {
+       return "github.com/segmentio/kafka-go"
+}
+
+func (i *Instrument) VersionChecker(version string) bool {
+       return true
+}
+
+func (i *Instrument) Points() []*instrument.Point {
+       return []*instrument.Point{
+               {
+                       PackageName: "kafka",
+                       At: instrument.NewMethodEnhance("*Writer", 
"WriteMessages",
+                               instrument.WithArgsCount(2),
+                               instrument.WithArgType(0, "context.Context"),
+                               instrument.WithArgType(1, "...Message"),
+                               instrument.WithResultCount(1),
+                               instrument.WithResultType(0, "error"),
+                       ),
+                       Interceptor: "WriterInterceptor",
+               },
+               {
+                       PackageName: "kafka",
+                       At: instrument.NewMethodEnhance("*Reader", 
"ReadMessage",
+                               instrument.WithArgsCount(1),
+                               instrument.WithArgType(0, "context.Context"),
+                               instrument.WithResultCount(2),
+                               instrument.WithResultType(0, "Message"),
+                               instrument.WithResultType(1, "error"),
+                       ),
+                       Interceptor: "ReaderInterceptor",
+               },
+       }
+}
+
+func (i *Instrument) FS() *embed.FS {
+       return &fs
+}
diff --git a/plugins/segmentio-kafka/reader_interceptor.go 
b/plugins/segmentio-kafka/reader_interceptor.go
new file mode 100644
index 0000000..4a876b4
--- /dev/null
+++ b/plugins/segmentio-kafka/reader_interceptor.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segmentiokafka
+
+import (
+       "strings"
+
+       "github.com/segmentio/kafka-go"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+       kafkaReaderPrefix      = "Kafka/"
+       kafkaReaderSuffix      = "/Consumer"
+       kafkaReaderComponentID = 41
+       semicolon              = ";"
+)
+
+type ReaderInterceptor struct {
+}
+
+func (r *ReaderInterceptor) BeforeInvoke(invocation operator.Invocation) error 
{
+       return nil
+}
+
+func (r *ReaderInterceptor) AfterInvoke(invocation operator.Invocation, result 
...interface{}) error {
+       reader := invocation.CallerInstance().(*kafka.Reader)
+       brokers := strings.Join(reader.Config().Brokers, semicolon)
+       message := result[0].(kafka.Message)
+       topic := message.Topic
+       operationName := kafkaReaderPrefix + topic + kafkaReaderSuffix
+
+       span, err := tracing.CreateEntrySpan(operationName, func(headerKey 
string) (string, error) {
+               for _, header := range message.Headers {
+                       if header.Key == headerKey {
+                               return string(header.Value), nil
+                       }
+               }
+               return "", nil
+       },
+               tracing.WithLayer(tracing.SpanLayerMQ),
+               tracing.WithComponent(kafkaReaderComponentID),
+               tracing.WithTag(tracing.TagMQBroker, brokers),
+               tracing.WithTag(tracing.TagMQTopic, topic),
+       )
+       if err != nil {
+               return err
+       }
+
+       if err, ok := result[1].(error); ok {
+               span.Tag(tracing.TagMQStatus, err.Error())
+               span.Error(err.Error())
+       }
+       span.SetPeer(brokers)
+       span.End()
+       return nil
+}
diff --git a/plugins/segmentio-kafka/writer_interceptor.go 
b/plugins/segmentio-kafka/writer_interceptor.go
new file mode 100644
index 0000000..78d93d9
--- /dev/null
+++ b/plugins/segmentio-kafka/writer_interceptor.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package segmentiokafka
+
+import (
+       "github.com/segmentio/kafka-go"
+
+       "github.com/apache/skywalking-go/plugins/core/operator"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+const (
+       kafkaWriterPrefix      = "Kafka/"
+       kafkaWriterSuffix      = "/Producer"
+       kafkaWriterComponentID = 40
+)
+
+type WriterInterceptor struct {
+}
+
+func (w *WriterInterceptor) BeforeInvoke(invocation operator.Invocation) error 
{
+       writer := invocation.CallerInstance().(*kafka.Writer)
+       addr, topic := writer.Addr.String(), writer.Topic
+       messageList := invocation.Args()[1].([]kafka.Message)
+       operationName := kafkaWriterPrefix + topic + kafkaWriterSuffix
+
+       span, err := tracing.CreateExitSpan(operationName, addr, 
func(headerKey, headerValue string) error {
+               for idx := range messageList {
+                       if len(messageList[idx].Headers) == 0 {
+                               messageList[idx].Headers = []kafka.Header{
+                                       {Key: headerKey, Value: 
[]byte(headerValue)},
+                               }
+                       } else {
+                               messageList[idx].Headers = 
append(messageList[idx].Headers,
+                                       kafka.Header{Key: headerKey, Value: 
[]byte(headerValue)})
+                       }
+               }
+               return nil
+       },
+               tracing.WithLayer(tracing.SpanLayerMQ),
+               tracing.WithComponent(kafkaWriterComponentID),
+               tracing.WithTag(tracing.TagMQBroker, addr),
+               tracing.WithTag(tracing.TagMQTopic, topic),
+       )
+       if err != nil {
+               return err
+       }
+
+       span.SetPeer(addr)
+       invocation.SetContext(span)
+       return nil
+}
+
+func (w *WriterInterceptor) AfterInvoke(invocation operator.Invocation, result 
...interface{}) error {
+       if invocation.GetContext() == nil {
+               return nil
+       }
+       span := invocation.GetContext().(tracing.Span)
+       if err, ok := result[0].(error); ok && err != nil {
+               span.Tag(tracing.TagMQStatus, err.Error())
+               span.Error(err.Error())
+       }
+       span.End()
+       return nil
+}
diff --git a/test/plugins/scenarios/segmentio-kafka/bin/startup.sh 
b/test/plugins/scenarios/segmentio-kafka/bin/startup.sh
new file mode 100755
index 0000000..e758869
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/bin/startup.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+home="$(cd "$(dirname $0)"; pwd)"
+go build ${GO_BUILD_OPTS} -o kafka
+
+./kafka
\ No newline at end of file
diff --git a/test/plugins/scenarios/segmentio-kafka/config/excepted.yml 
b/test/plugins/scenarios/segmentio-kafka/config/excepted.yml
new file mode 100644
index 0000000..bb4f7b7
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/config/excepted.yml
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+segmentItems:
+  - serviceName: segmentio-kafka
+    segmentSize: 3
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: Kafka/sw-topic/Producer
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 40
+            isError: false
+            spanType: Exit
+            peer: kafka-server:9092
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+          - operationName: GET:/execute
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 5004
+            isError: false
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
+            tags:
+              - { key: http.method, value: GET }
+              - { key: url, value: 'service:8080/execute' }
+              - { key: status_code, value: '200' }
+      - segmentId: not null
+        spans:
+          - operationName: Kafka/sw-topic/Consumer
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: MQ
+            startTime: nq 0
+            endTime: nq 0
+            componentId: 41
+            isError: false
+            spanType: Entry
+            peer: kafka-server:9092
+            skipAnalysis: false
+            tags:
+              - { key: mq.broker, value: not null }
+              - { key: mq.topic, value: not null }
+            refs:
+              - { parentEndpoint: 'GET:/execute', networkAddress: 
'kafka-server:9092',
+                  refType: CrossProcess, parentSpanId: 1, 
parentTraceSegmentId: not null,
+                  parentServiceInstance: not null, parentService: 
segmentio-kafka,
+                  traceId: not null }
+meterItems: [ ]
+logItems: [ ]
\ No newline at end of file
diff --git a/test/plugins/scenarios/segmentio-kafka/go.mod 
b/test/plugins/scenarios/segmentio-kafka/go.mod
new file mode 100644
index 0000000..0b839b5
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/go.mod
@@ -0,0 +1,9 @@
+module test/plugins/scenarios/segmentio-kafka
+
+go 1.18
+
+require (
+       github.com/klauspost/compress v1.15.9 // indirect
+       github.com/pierrec/lz4/v4 v4.1.15 // indirect
+       github.com/segmentio/kafka-go v0.4.47 // indirect
+)
diff --git a/test/plugins/scenarios/segmentio-kafka/go.sum 
b/test/plugins/scenarios/segmentio-kafka/go.sum
new file mode 100644
index 0000000..cf392a8
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/go.sum
@@ -0,0 +1,59 @@
+github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/klauspost/compress v1.15.9 
h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
+github.com/klauspost/compress v1.15.9/go.mod 
h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/pierrec/lz4/v4 v4.1.15 
h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
+github.com/pierrec/lz4/v4 v4.1.15/go.mod 
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/segmentio/kafka-go v0.4.47 
h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod 
h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
+github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod 
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod 
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod 
h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.2/go.mod 
h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
+github.com/xdg-go/stringprep v1.0.4/go.mod 
h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
+github.com/yuin/goldmark v1.4.13/go.mod 
h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod 
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.14.0/go.mod 
h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod 
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
+golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod 
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod 
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
+golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod 
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
+golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
+golang.org/x/term v0.13.0/go.mod 
h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
+golang.org/x/text v0.13.0/go.mod 
h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.12/go.mod 
h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
+golang.org/x/tools v0.6.0/go.mod 
h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/test/plugins/scenarios/segmentio-kafka/main.go 
b/test/plugins/scenarios/segmentio-kafka/main.go
new file mode 100644
index 0000000..58df214
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/main.go
@@ -0,0 +1,148 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "log"
+       "net"
+       "net/http"
+       "strconv"
+       "time"
+
+       "github.com/segmentio/kafka-go"
+
+       _ "github.com/apache/skywalking-go"
+)
+
+type testFunc func() error
+
+var (
+       url    = "kafka-server:9092"
+       topic  = "sw-topic"
+       msg    = "I love skywalking 3 thousand"
+       ctx    = context.Background()
+       writer *kafka.Writer
+       reader *kafka.Reader
+)
+
+func main() {
+       writer = newKafkaWriter(topic)
+       defer writer.Close()
+       reader = newKafkaReader()
+       defer reader.Close()
+       consumerHelper()
+
+       route := http.NewServeMux()
+       route.HandleFunc("/execute", func(res http.ResponseWriter, req 
*http.Request) {
+               testProduceConsume()
+               _, _ = res.Write([]byte("execute success"))
+       })
+       route.HandleFunc("/health", func(writer http.ResponseWriter, request 
*http.Request) {
+               _, _ = writer.Write([]byte("ok"))
+       })
+       fmt.Println("start client")
+       err := http.ListenAndServe(":8080", route)
+       if err != nil {
+               fmt.Printf("client start error: %v \n", err)
+       }
+}
+
+func testProduceConsume() {
+       tests := []struct {
+               name string
+               fn   testFunc
+       }{
+               {"simpleMsg", simpleMsg},
+       }
+       for _, test := range tests {
+               fmt.Printf("excute test case: %s\n", test.name)
+               if subErr := test.fn(); subErr != nil {
+                       fmt.Printf("test case %s failed: %v", test.name, subErr)
+               }
+       }
+}
+
+func simpleMsg() error {
+       if err := writer.WriteMessages(ctx, kafka.Message{
+               Value: []byte(msg),
+       }); err != nil {
+               log.Println("simpleMsg WriteMessages error")
+               return err
+       }
+       return nil
+}
+
+func consumerHelper() {
+       go func() {
+               for {
+                       if message, err := reader.ReadMessage(ctx); err != nil {
+                               log.Fatal("consumer msg error: ", err)
+                       } else {
+                               fmt.Printf("consumer|topic=%s, partition=%d, 
offset=%d, key=%s, value=%s, header=%s\n",
+                                       message.Topic, message.Partition, 
message.Offset, string(message.Key), string(message.Value), message.Headers)
+                       }
+               }
+       }()
+}
+
+func newKafkaReader() *kafka.Reader {
+       return kafka.NewReader(kafka.ReaderConfig{
+               Brokers:        []string{url},
+               Topic:          topic,
+               CommitInterval: 1 * time.Second,
+       })
+}
+
+func newKafkaWriter(topic string) *kafka.Writer {
+       createTopic()
+       return &kafka.Writer{
+               Addr:  kafka.TCP(url),
+               Topic: topic,
+       }
+}
+
+func createTopic() {
+       conn, err := kafka.Dial("tcp", url)
+       if err != nil {
+               log.Fatal(fmt.Errorf("createTopic, Dial: %w", err))
+       }
+       defer conn.Close()
+       controller, err := conn.Controller()
+       if err != nil {
+               err = fmt.Errorf("createTopic, conn.Controller: %w", err)
+               log.Fatal(err)
+       }
+       conn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, 
strconv.Itoa(controller.Port)))
+       if err != nil {
+               log.Fatal("kafka.Dial error: ", err)
+       }
+       conn.SetDeadline(time.Now().Add(time.Second))
+       topicConfigs := []kafka.TopicConfig{
+               {
+                       Topic:             topic,
+                       NumPartitions:     1,
+                       ReplicationFactor: 1,
+               },
+       }
+       err = conn.CreateTopics(topicConfigs...)
+       if err != nil {
+               log.Fatal(fmt.Errorf("createTopic error: %w", err))
+       }
+}
diff --git a/test/plugins/scenarios/segmentio-kafka/plugin.yml 
b/test/plugins/scenarios/segmentio-kafka/plugin.yml
new file mode 100644
index 0000000..887e422
--- /dev/null
+++ b/test/plugins/scenarios/segmentio-kafka/plugin.yml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+entry-service: http://${HTTP_HOST}:${HTTP_PORT}/execute
+health-checker: http://${HTTP_HOST}:${HTTP_PORT}/health
+start-script: ./bin/startup.sh
+framework: github.com/segmentio/kafka-go
+export-port: 8080
+support-version:
+  - go: 1.18
+    framework:
+      - v0.4.47
+dependencies:
+  zookeeper-server:
+    image: zookeeper:3.9.2
+    hostname: zookeeper-server
+  kafka-server:
+    image: bitnami/kafka:3.7.0
+    hostname: kafka-server
+    ports:
+      - 9092
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-server:2181"
+      KAFKA_BROKER_ID: 1
+      KAFKA_LISTENERS: "PLAINTEXT://kafka-server:9092"
+    depends_on:
+      - zookeeper-server
\ No newline at end of file
diff --git a/tools/go-agent/instrument/plugins/register.go 
b/tools/go-agent/instrument/plugins/register.go
index 7726b41..2ed4857 100644
--- a/tools/go-agent/instrument/plugins/register.go
+++ b/tools/go-agent/instrument/plugins/register.go
@@ -41,6 +41,7 @@ import (
        "github.com/apache/skywalking-go/plugins/pulsar"
        "github.com/apache/skywalking-go/plugins/rocketmq"
        runtime_metrics "github.com/apache/skywalking-go/plugins/runtimemetrics"
+       segmentiokafka "github.com/apache/skywalking-go/plugins/segmentio-kafka"
        sql_entry "github.com/apache/skywalking-go/plugins/sql/entry"
        sql_mysql "github.com/apache/skywalking-go/plugins/sql/mysql"
 )
@@ -66,6 +67,7 @@ func init() {
        registerFramework(rocketmq.NewInstrument())
        registerFramework(amqp.NewInstrument())
        registerFramework(pulsar.NewInstrument())
+       registerFramework(segmentiokafka.NewInstrument())
 
        // fasthttp related instruments
        registerFramework(fasthttp_client.NewInstrument())

Reply via email to