This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 02dc53c  Add Pulsar plugin (#345)
02dc53c is described below

commit 02dc53ce2de37c8ac3396bbf298004eeb4f42942
Author: Starry <zhouzi...@apache.org>
AuthorDate: Sat Jun 29 20:32:54 2024 +0800

    Add Pulsar plugin (#345)
---
 .gitmodules                                      |   2 +-
 CHANGELOG.md                                     |   1 +
 Makefile                                         |   4 +-
 docs/en/setup/Plugins.md                         |   1 +
 docs/en/setup/advanced/LogReporter.md            |   2 +-
 poetry.lock                                      |  49 ++++++++++-
 protocol                                         |   2 +-
 pyproject.toml                                   |   1 +
 skywalking/__init__.py                           |   2 +
 skywalking/plugins/sw_pulsar.py                  | 107 +++++++++++++++++++++++
 tests/plugin/data/sw_pulsar/__init__.py          |  16 ++++
 tests/plugin/data/sw_pulsar/docker-compose.yml   |  90 +++++++++++++++++++
 tests/plugin/data/sw_pulsar/expected.data.yml    |  86 ++++++++++++++++++
 tests/plugin/data/sw_pulsar/services/__init__.py |  16 ++++
 tests/plugin/data/sw_pulsar/services/consumer.py |  32 +++++++
 tests/plugin/data/sw_pulsar/services/producer.py |  43 +++++++++
 tests/plugin/data/sw_pulsar/test_pulsar.py       |  36 ++++++++
 17 files changed, 485 insertions(+), 5 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index ebb8ea3..27ba66d 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -18,4 +18,4 @@
 #
 [submodule "protocol"]
        path = protocol
-       url = https://github.com/apache/skywalking-data-collect-protocol
+       url = https://github.com/apache/skywalking-data-collect-protocol.git
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 962083a..68c0563 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@
 
 - Plugins:
   - Add neo4j plugin.(#312)
+  - Add pulsar plugin.(#345)
 
 - Fixes:
   - Fix unexpected 'No active span' IllegalStateError (#311)
diff --git a/Makefile b/Makefile
index c581509..408d42b 100644
--- a/Makefile
+++ b/Makefile
@@ -43,9 +43,11 @@ poetry:
 ifeq ($(OS),Windows)
        -powershell (Invoke-WebRequest -Uri https://install.python-poetry.org 
-UseBasicParsing).Content | py -
        poetry self update
-else
+else ifeq ($(OS),Darwin)
        -curl -sSL https://install.python-poetry.org | python3 -
        poetry self update || $(MAKE) poetry-fallback
+else
+       -curl -sSL https://install.python-poetry.org | python3 - --version 1.5.1
 endif
 
 .PHONY: gen
diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md
index ca0fc38..da29e47 100644
--- a/docs/en/setup/Plugins.md
+++ b/docs/en/setup/Plugins.md
@@ -36,6 +36,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome 
to contribute!)
 | [neo4j](https://neo4j.com/docs/python-manual/5/) | Python >=3.7 - ['5.*'];  
| `sw_neo4j` |
 | [psycopg[binary]](https://www.psycopg.org/) | Python >=3.11 - ['3.1.*']; 
Python >=3.7 - ['3.0.18', '3.1.*'];  | `sw_psycopg` |
 | [psycopg2-binary](https://www.psycopg.org/) | Python >=3.10 - NOT SUPPORTED 
YET; Python >=3.7 - ['2.9'];  | `sw_psycopg2` |
+| [pulsar-client](https://github.com/apache/pulsar-client-python) | Python 
>=3.8 - ['3.3.0'];  | `sw_pulsar` |
 | [pymongo](https://pymongo.readthedocs.io) | Python >=3.7 - ['3.11.*'];  | 
`sw_pymongo` |
 | [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 - 
['1.0'];  | `sw_pymysql` |
 | [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0'];  | 
`sw_pyramid` |
diff --git a/docs/en/setup/advanced/LogReporter.md 
b/docs/en/setup/advanced/LogReporter.md
index e08e65f..9d16b86 100644
--- a/docs/en/setup/advanced/LogReporter.md
+++ b/docs/en/setup/advanced/LogReporter.md
@@ -9,7 +9,7 @@ Log reporter supports all three protocols including `grpc`, 
`http` and `kafka`,
 If chosen `http` protocol, the logs will be batch-reported to the collector 
REST endpoint `oap/v3/logs`.
 
 If chosen `kafka` protocol, please make sure to config 
-[kafka-fetcher](https://skywalking.apache.org/docs/main/v9.1.0/en/setup/backend/kafka-fetcher/)
 
+[kafka-fetcher](https://skywalking.apache.org/docs/main/v10.0.1/en/setup/backend/kafka-fetcher/)
 on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` 
points to your Kafka brokers.
 
 **Please make sure OAP is consuming the same Kafka topic as your agent 
produces to, `kafka_namespace` must match OAP side configuration 
`plugin.kafka.namespace`**
diff --git a/poetry.lock b/poetry.lock
index 52f500a..d098307 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2280,6 +2280,53 @@ files = [
     {file = "psycopg2_binary-2.9.7-cp39-cp39-win_amd64.whl", hash = 
"sha256:eb3b8d55924a6058a26db69fb1d3e7e32695ff8b491835ba9f479537e14dcf9f"},
 ]
 
+[[package]]
+name = "pulsar-client"
+version = "3.3.0"
+description = "Apache Pulsar Python client library"
+optional = false
+python-versions = "*"
+files = [
+    {file = "pulsar_client-3.3.0-cp310-cp310-macosx_10_15_universal2.whl", 
hash = 
"sha256:c31afd3e67a044ff93177df89e08febf214cc965e95ede097d9fe8755af00e01"},
+    {file = 
"pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
 hash = 
"sha256:1f66982284571674b215324cc26b5c2f7c56c7043113c47a7084cb70d67a8afb"},
+    {file = 
"pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
 hash = 
"sha256:7fe50a06f81c48a75a9b95c27a6446260039adca71d9face273740de96b2efca"},
+    {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash 
= "sha256:d4c46a4b96a6e9919cfe220156d69a2ede8053d9ea1add4ada108abcf2ba9775"},
+    {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = 
"sha256:1e4b5d44b992c9b036286b483f3588c10b89c6047fb59d80c7474445997f4e10"},
+    {file = "pulsar_client-3.3.0-cp310-cp310-win_amd64.whl", hash = 
"sha256:497a59ac6b650835a3b2c502f53477e5c98e5226998ca3f17c0b0a3eb4d67d08"},
+    {file = "pulsar_client-3.3.0-cp311-cp311-macosx_10_15_universal2.whl", 
hash = 
"sha256:386e78ff52058d881780bae1f6e84ac9434ae0b01a8581755ca8cc0dc844a332"},
+    {file = 
"pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
 hash = 
"sha256:3e4ecb780df58bcfd3918590bd3ff31ed79bccfbef3a1a60370642eb1e14a9d2"},
+    {file = 
"pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
 hash = 
"sha256:7ce1e215c252f22a6f26ca5e9076826041a04d88dc213b92c86b524be2774a64"},
+    {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash 
= "sha256:88b0fd5be73a4103986b9dbe3a66468cf8829371e34af87ff8f216e3980f4cbe"},
+    {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = 
"sha256:33656450536d83eed1563ff09692c2c415fb199d88e9ed97d701ca446a119e1b"},
+    {file = "pulsar_client-3.3.0-cp311-cp311-win_amd64.whl", hash = 
"sha256:ce33de700b06583df8777e139d68cb4b4b3d0a2eac168d74278d8935f357fb10"},
+    {file = "pulsar_client-3.3.0-cp37-cp37m-macosx_10_15_universal2.whl", hash 
= "sha256:7b5dd25cf778d6c980d36c53081e843ea272afe7af4f0ad6394ae9513f94641b"},
+    {file = 
"pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
 hash = 
"sha256:33c4e6865fda62a2e460f823dce4d49ac2973a4459b8ff99eda5fdd6aaaebf46"},
+    {file = 
"pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
 hash = 
"sha256:f1810ddc623c8de2675d17405ce47057a9a2b92298e708ce4d9564847f5ad904"},
+    {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = 
"sha256:8259c3b856eb6deaa1f93dce893ab18d99d36d102da5612c8e97a4fb41b70ab1"},
+    {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = 
"sha256:5e7a48b2e505cde758fd51a601b5da0671fa98c9baee38362aaaa3ab2b930c28"},
+    {file = "pulsar_client-3.3.0-cp37-cp37m-win_amd64.whl", hash = 
"sha256:ede264385d47257b2f2b08ecde9181ec5338bea5639cc543d1856f01736778d2"},
+    {file = "pulsar_client-3.3.0-cp38-cp38-macosx_10_15_universal2.whl", hash 
= "sha256:0f64c62746ccd5b65a0c505f5f40b9af1f147eb1fa2d8f9c90cd5c8b92dd8597"},
+    {file = 
"pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
 hash = 
"sha256:5b84a20c9012e3c4ef1b7085acd7467197118c090b378dec27d773fb79d91556"},
+    {file = 
"pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:c4e15fa696e275ccb66d0791fdc19c4dea0420d81349c8055e485b134125e14f"},
+    {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = 
"sha256:72cbb1bdcba2dd1265296b5ba65331622ee89c16db75edaad46dd7b90c6dd447"},
+    {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = 
"sha256:d54dd12955bf587dd46d9184444af5e853d9da2a14bbfb739ed2c7c3b78ce280"},
+    {file = "pulsar_client-3.3.0-cp38-cp38-win_amd64.whl", hash = 
"sha256:43f98afdf0334b2b957a4d96f97a1fe8a7f7fd1e2631d40c3f00b4162f396485"},
+    {file = "pulsar_client-3.3.0-cp39-cp39-macosx_10_15_universal2.whl", hash 
= "sha256:efe7c1e6a96daccc522c3567b6847ffa54c13e0f510d9a427b4aeff9fbebe54b"},
+    {file = 
"pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl",
 hash = 
"sha256:f28e94420090fceeb38e23fc744f3edf8710e48314ef5927d2b674a1d1e43ee0"},
+    {file = 
"pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", 
hash = 
"sha256:42c8f3eaa98e2351805ecb6efb6d5fedf47a314a3ce6af0e05ea1449ea7244ed"},
+    {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = 
"sha256:5e69750f8ae57e55fddf97b459ce0d8b38b2bb85f464a71e871ee6a86d893be7"},
+    {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = 
"sha256:7e147e5ba460c1818bc05254279a885b4e552bcafb8961d40e31f98d5ff46628"},
+    {file = "pulsar_client-3.3.0-cp39-cp39-win_amd64.whl", hash = 
"sha256:694530af1d6c75fb81456fb509778c1868adee31e997ddece6e21678200182ea"},
+]
+
+[package.dependencies]
+certifi = "*"
+
+[package.extras]
+all = ["apache-bookkeeper-client (>=4.16.1)", "fastavro (==1.7.3)", "grpcio 
(>=1.8.2)", "prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"]
+avro = ["fastavro (==1.7.3)"]
+functions = ["apache-bookkeeper-client (>=4.16.1)", "grpcio (>=1.8.2)", 
"prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"]
+
 [[package]]
 name = "pycodestyle"
 version = "2.9.1"
@@ -3683,4 +3730,4 @@ sync = ["kafka-python", "requests"]
 [metadata]
 lock-version = "2.0"
 python-versions = ">=3.7, <3.12"
-content-hash = 
"016644168e470e2904bc0a4109c0218c7b9ecf0890d17a32e28aa81bcda0e8d0"
+content-hash = 
"ae3cd5c63201383530bf2daac37be0ef9399bf4384dbe42048c81e945b70642a"
diff --git a/protocol b/protocol
index 9b2f4a5..b5f6ebe 160000
--- a/protocol
+++ b/protocol
@@ -1 +1 @@
-Subproject commit 9b2f4a5fb5694381924674d6c15cbead6a388d97
+Subproject commit b5f6ebe281b96d89968959f55baa3d9aa1bfecee
diff --git a/pyproject.toml b/pyproject.toml
index 734b058..8fc836f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -156,6 +156,7 @@ loguru = "^0.6.0"
 httpx = "^0.23.3"
 confluent-kafka = "^2.0.2"
 neo4j = "^5.9.0"
+pulsar-client = "3.3.0"
 
 [tool.poetry.group.lint.dependencies]
 pylint = '2.13.9'
diff --git a/skywalking/__init__.py b/skywalking/__init__.py
index 1ed0c2b..ec1dc09 100644
--- a/skywalking/__init__.py
+++ b/skywalking/__init__.py
@@ -36,6 +36,8 @@ class Component(Enum):
     KafkaConsumer = 41
     RabbitmqProducer = 52
     RabbitmqConsumer = 53
+    PulsarProducer = 73
+    PulsarConsumer = 74
     Elasticsearch = 47
     HBase = 94
     Neo4j = 112
diff --git a/skywalking/plugins/sw_pulsar.py b/skywalking/plugins/sw_pulsar.py
new file mode 100644
index 0000000..33b24a3
--- /dev/null
+++ b/skywalking/plugins/sw_pulsar.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from skywalking import Layer, Component
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import TagMqTopic, TagMqBroker
+
+link_vector = ['https://github.com/apache/pulsar-client-python']
+support_matrix = {
+    'pulsar-client': {
+        '>=3.8': ['3.3.0']
+    }
+}
+note = """"""
+
+
+def install():
+    from pulsar import Producer
+    from pulsar import Consumer
+    from pulsar import Client
+
+    __init = Client.__init__
+    _send = Producer.send
+    _receive = Consumer.receive
+    _peer = ''
+
+    def get_peer():
+        return _peer
+
+    def set_peer(value):
+        nonlocal _peer
+        _peer = value
+
+    def _sw_init(self, service_url):
+        __init(self, service_url)
+        set_peer(service_url)
+
+    def _sw_send_func(_send):
+        def _sw_send(this, content,
+                     properties=None,
+                     partition_key=None,
+                     sequence_id=None,
+                     replication_clusters=None,
+                     disable_replication=False,
+                     event_timestamp=None,
+                     deliver_at=None,
+                     deliver_after=None,
+                     ):
+            topic = this._producer.topic().split('/')[-1]
+            with 
get_context().new_exit_span(op=f'Pulsar/Topic/{topic}/Producer', 
peer=get_peer(),
+                                             
component=Component.PulsarProducer) as span:
+                span.tag(TagMqTopic(topic))
+                span.tag(TagMqBroker(get_peer()))
+                span.layer = Layer.MQ
+
+                carrier = span.inject()
+                if properties is None:
+                    properties = {}
+                for item in carrier:
+                    properties[item.key] = item.val
+
+                return _send(this, content, properties=properties, 
partition_key=partition_key,
+                             sequence_id=sequence_id, 
replication_clusters=replication_clusters,
+                             disable_replication=disable_replication, 
event_timestamp=event_timestamp,
+                             deliver_at=deliver_at, 
deliver_after=deliver_after)
+
+        return _sw_send
+
+    def _sw_receive_func(_receive):
+        def _sw_receive(this, timeout_millis=None):
+            res = _receive(this, timeout_millis=timeout_millis)
+            if res:
+                topic = res.topic_name().split('/')[-1]
+                properties = res.properties()
+                carrier = Carrier()
+                for item in carrier:
+                    if item.key in properties.keys():
+                        val = res.properties().get(item.key)
+                        if val is not None:
+                            item.val = val
+
+                with 
get_context().new_entry_span(op=f'Pulsar/Topic/{topic}/Consumer', 
carrier=carrier) as span:
+                    span.tag(TagMqTopic(topic))
+                    span.tag(TagMqBroker(get_peer()))
+                    span.layer = Layer.MQ
+                    span.component = Component.PulsarConsumer
+            return res
+
+        return _sw_receive
+
+    Client.__init__ = _sw_init
+    Producer.send = _sw_send_func(_send)
+    Consumer.receive = _sw_receive_func(_receive)
diff --git a/tests/plugin/data/sw_pulsar/__init__.py 
b/tests/plugin/data/sw_pulsar/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/data/sw_pulsar/docker-compose.yml 
b/tests/plugin/data/sw_pulsar/docker-compose.yml
new file mode 100644
index 0000000..83197e2
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/docker-compose.yml
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+  collector:
+    extends:
+      service: collector
+      file: ../../docker-compose.base.yml
+
+  pulsar-server:
+    image: apachepulsar/pulsar:3.2.0
+    hostname: pulsar-server
+    ports:
+      - 6650:6650
+      - 8080:8080
+    networks:
+      - beyond
+    command: ["bash","-c", "bin/pulsar standalone"]
+    healthcheck:
+      test: ["CMD", "nc", "-nz", "127.0.0.1", "8080"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  producer:
+    extends:
+      service: agent
+      file: ../../docker-compose.base.yml
+    ports:
+      - 9090:9090
+    volumes:
+      - .:/app
+    command: ['bash', '-c', 'pip install flask && pip install -r 
/app/requirements.txt && sw-python run python3 /app/services/producer.py']
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      collector:
+        condition: service_healthy
+      pulsar-server:
+        condition: service_healthy
+      consumer:
+        condition: service_healthy
+    environment:
+      SW_AGENT_NAME: producer
+      SW_AGENT_LOGGING_LEVEL: INFO
+
+  consumer:
+    extends:
+      service: agent
+      file: ../../docker-compose.base.yml
+    ports:
+      - 9091:9091
+    volumes:
+      - .:/app
+    command: ['bash', '-c', 'pip install flask && pip install -r 
/app/requirements.txt && sw-python run python3 /app/services/consumer.py']
+    healthcheck:
+      test: ["CMD", "bash", "-c", "ps -ef | grep /app/services/consumer | grep 
-v grep"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      collector:
+        condition: service_healthy
+      pulsar-server:
+        condition: service_healthy
+    environment:
+      SW_AGENT_NAME: consumer
+      SW_AGENT_LOGGING_LEVEL: INFO
+
+networks:
+  beyond:
\ No newline at end of file
diff --git a/tests/plugin/data/sw_pulsar/expected.data.yml 
b/tests/plugin/data/sw_pulsar/expected.data.yml
new file mode 100644
index 0000000..6b61114
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/expected.data.yml
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+  - serviceName: producer
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: Pulsar/Topic/sw-topic/Producer
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: MQ
+            tags:
+              - key: mq.topic
+                value: sw-topic
+              - key: mq.broker
+                value: 'pulsar://pulsar-server:6650'
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 73
+            spanType: Exit
+            peer: pulsar://pulsar-server:6650
+            skipAnalysis: false
+          - operationName: /users
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            tags:
+              - key: http.method
+                value: GET
+              - key: http.url
+                value: http://0.0.0.0:9090/users
+              - key: http.status_code
+                value: '200'
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 7001
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+  - serviceName: consumer
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: Pulsar/Topic/sw-topic/Consumer
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: MQ
+            tags:
+              - key: mq.topic
+                value: sw-topic
+              - key: mq.broker
+                value: 'pulsar://pulsar-server:6650'
+            refs:
+              - parentEndpoint: Pulsar/Topic/sw-topic/Producer
+                networkAddress: 'pulsar://pulsar-server:6650'
+                refType: CrossProcess
+                parentSpanId: 1
+                parentTraceSegmentId: not null
+                parentServiceInstance: not null
+                parentService: producer
+                traceId: not null
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 74
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
diff --git a/tests/plugin/data/sw_pulsar/services/__init__.py 
b/tests/plugin/data/sw_pulsar/services/__init__.py
new file mode 100644
index 0000000..b1312a0
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/services/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/tests/plugin/data/sw_pulsar/services/consumer.py 
b/tests/plugin/data/sw_pulsar/services/consumer.py
new file mode 100644
index 0000000..fc444b1
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/services/consumer.py
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if __name__ == '__main__':
+    import pulsar
+
+    client = pulsar.Client(service_url='pulsar://pulsar-server:6650')
+    consumer = client.subscribe('sw-topic', 'sw-subscription')
+
+    while True:
+        try:
+            msg = consumer.receive()
+            print('Received message = ', str(msg.data().decode('utf-8')), 
'|message_id = ', msg.message_id())
+            consumer.acknowledge(msg)
+        except pulsar.Interrupted:
+            break
+
+    client.close()
diff --git a/tests/plugin/data/sw_pulsar/services/producer.py 
b/tests/plugin/data/sw_pulsar/services/producer.py
new file mode 100644
index 0000000..9505f82
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/services/producer.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+if __name__ == '__main__':
+    import pulsar
+    from flask import Flask, jsonify
+    from pulsar import BatchingType
+
+    app = Flask(__name__)
+    client = pulsar.Client(service_url='pulsar://pulsar-server:6650')
+    producer = client.create_producer(
+        'sw-topic',
+        block_if_queue_full=True,
+        batching_enabled=True,
+        batching_max_publish_delay_ms=10,
+        batching_type=BatchingType.KeyBased
+    )
+
+
+    @app.route('/users', methods=['POST', 'GET'])
+    def application():
+        producer.send('I love skywalking 3 thousand'.encode('utf-8'), None)
+        producer.flush()
+        producer.close()
+        return jsonify({'song': 'Despacito', 'artist': 'Luis Fonsi'})
+
+    PORT = 9090
+    app.run(host='0.0.0.0', port=PORT, debug=True)
diff --git a/tests/plugin/data/sw_pulsar/test_pulsar.py 
b/tests/plugin/data/sw_pulsar/test_pulsar.py
new file mode 100644
index 0000000..59767e8
--- /dev/null
+++ b/tests/plugin/data/sw_pulsar/test_pulsar.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Callable
+
+import pytest
+import requests
+
+from skywalking.plugins.sw_pulsar import support_matrix
+from tests.orchestrator import get_test_vector
+from tests.plugin.base import TestPluginBase
+
+
+@pytest.fixture
+def prepare():
+    # type: () -> Callable
+    return lambda *_: requests.get('http://0.0.0.0:9090/users', timeout=5)
+
+
+class TestPlugin(TestPluginBase):
+    @pytest.mark.parametrize('version', 
get_test_vector(lib_name='pulsar-client', support_matrix=support_matrix))
+    def test_plugin(self, docker_compose, version):
+        self.validate()

Reply via email to