This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push: new 02dc53c Add Pulsar plugin (#345) 02dc53c is described below commit 02dc53ce2de37c8ac3396bbf298004eeb4f42942 Author: Starry <zhouzi...@apache.org> AuthorDate: Sat Jun 29 20:32:54 2024 +0800 Add Pulsar plugin (#345) --- .gitmodules | 2 +- CHANGELOG.md | 1 + Makefile | 4 +- docs/en/setup/Plugins.md | 1 + docs/en/setup/advanced/LogReporter.md | 2 +- poetry.lock | 49 ++++++++++- protocol | 2 +- pyproject.toml | 1 + skywalking/__init__.py | 2 + skywalking/plugins/sw_pulsar.py | 107 +++++++++++++++++++++++ tests/plugin/data/sw_pulsar/__init__.py | 16 ++++ tests/plugin/data/sw_pulsar/docker-compose.yml | 90 +++++++++++++++++++ tests/plugin/data/sw_pulsar/expected.data.yml | 86 ++++++++++++++++++ tests/plugin/data/sw_pulsar/services/__init__.py | 16 ++++ tests/plugin/data/sw_pulsar/services/consumer.py | 32 +++++++ tests/plugin/data/sw_pulsar/services/producer.py | 43 +++++++++ tests/plugin/data/sw_pulsar/test_pulsar.py | 36 ++++++++ 17 files changed, 485 insertions(+), 5 deletions(-) diff --git a/.gitmodules b/.gitmodules index ebb8ea3..27ba66d 100644 --- a/.gitmodules +++ b/.gitmodules @@ -18,4 +18,4 @@ # [submodule "protocol"] path = protocol - url = https://github.com/apache/skywalking-data-collect-protocol + url = https://github.com/apache/skywalking-data-collect-protocol.git diff --git a/CHANGELOG.md b/CHANGELOG.md index 962083a..68c0563 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Plugins: - Add neo4j plugin.(#312) + - Add pulsar plugin.(#345) - Fixes: - Fix unexpected 'No active span' IllegalStateError (#311) diff --git a/Makefile b/Makefile index c581509..408d42b 100644 --- a/Makefile +++ b/Makefile @@ -43,9 +43,11 @@ poetry: ifeq ($(OS),Windows) -powershell (Invoke-WebRequest -Uri https://install.python-poetry.org -UseBasicParsing).Content | py - poetry self update -else +else ifeq ($(OS),Darwin) -curl -sSL https://install.python-poetry.org | python3 - poetry self update || $(MAKE) poetry-fallback +else + -curl -sSL https://install.python-poetry.org | python3 - --version 1.5.1 endif .PHONY: gen diff --git a/docs/en/setup/Plugins.md b/docs/en/setup/Plugins.md index ca0fc38..da29e47 100644 --- a/docs/en/setup/Plugins.md +++ b/docs/en/setup/Plugins.md @@ -36,6 +36,7 @@ or a limitation of SkyWalking auto-instrumentation (welcome to contribute!) | [neo4j](https://neo4j.com/docs/python-manual/5/) | Python >=3.7 - ['5.*']; | `sw_neo4j` | | [psycopg[binary]](https://www.psycopg.org/) | Python >=3.11 - ['3.1.*']; Python >=3.7 - ['3.0.18', '3.1.*']; | `sw_psycopg` | | [psycopg2-binary](https://www.psycopg.org/) | Python >=3.10 - NOT SUPPORTED YET; Python >=3.7 - ['2.9']; | `sw_psycopg2` | +| [pulsar-client](https://github.com/apache/pulsar-client-python) | Python >=3.8 - ['3.3.0']; | `sw_pulsar` | | [pymongo](https://pymongo.readthedocs.io) | Python >=3.7 - ['3.11.*']; | `sw_pymongo` | | [pymysql](https://pymysql.readthedocs.io/en/latest/) | Python >=3.7 - ['1.0']; | `sw_pymysql` | | [pyramid](https://trypyramid.com) | Python >=3.7 - ['1.10', '2.0']; | `sw_pyramid` | diff --git a/docs/en/setup/advanced/LogReporter.md b/docs/en/setup/advanced/LogReporter.md index e08e65f..9d16b86 100644 --- a/docs/en/setup/advanced/LogReporter.md +++ b/docs/en/setup/advanced/LogReporter.md @@ -9,7 +9,7 @@ Log reporter supports all three protocols including `grpc`, `http` and `kafka`, If chosen `http` protocol, the logs will be batch-reported to the collector REST endpoint `oap/v3/logs`. If chosen `kafka` protocol, please make sure to config -[kafka-fetcher](https://skywalking.apache.org/docs/main/v9.1.0/en/setup/backend/kafka-fetcher/) +[kafka-fetcher](https://skywalking.apache.org/docs/main/v10.0.1/en/setup/backend/kafka-fetcher/) on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` points to your Kafka brokers. **Please make sure OAP is consuming the same Kafka topic as your agent produces to, `kafka_namespace` must match OAP side configuration `plugin.kafka.namespace`** diff --git a/poetry.lock b/poetry.lock index 52f500a..d098307 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2280,6 +2280,53 @@ files = [ {file = "psycopg2_binary-2.9.7-cp39-cp39-win_amd64.whl", hash = "sha256:eb3b8d55924a6058a26db69fb1d3e7e32695ff8b491835ba9f479537e14dcf9f"}, ] +[[package]] +name = "pulsar-client" +version = "3.3.0" +description = "Apache Pulsar Python client library" +optional = false +python-versions = "*" +files = [ + {file = "pulsar_client-3.3.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:c31afd3e67a044ff93177df89e08febf214cc965e95ede097d9fe8755af00e01"}, + {file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f66982284571674b215324cc26b5c2f7c56c7043113c47a7084cb70d67a8afb"}, + {file = "pulsar_client-3.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fe50a06f81c48a75a9b95c27a6446260039adca71d9face273740de96b2efca"}, + {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d4c46a4b96a6e9919cfe220156d69a2ede8053d9ea1add4ada108abcf2ba9775"}, + {file = "pulsar_client-3.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:1e4b5d44b992c9b036286b483f3588c10b89c6047fb59d80c7474445997f4e10"}, + {file = "pulsar_client-3.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:497a59ac6b650835a3b2c502f53477e5c98e5226998ca3f17c0b0a3eb4d67d08"}, + {file = "pulsar_client-3.3.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:386e78ff52058d881780bae1f6e84ac9434ae0b01a8581755ca8cc0dc844a332"}, + {file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3e4ecb780df58bcfd3918590bd3ff31ed79bccfbef3a1a60370642eb1e14a9d2"}, + {file = "pulsar_client-3.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ce1e215c252f22a6f26ca5e9076826041a04d88dc213b92c86b524be2774a64"}, + {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:88b0fd5be73a4103986b9dbe3a66468cf8829371e34af87ff8f216e3980f4cbe"}, + {file = "pulsar_client-3.3.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:33656450536d83eed1563ff09692c2c415fb199d88e9ed97d701ca446a119e1b"}, + {file = "pulsar_client-3.3.0-cp311-cp311-win_amd64.whl", hash = "sha256:ce33de700b06583df8777e139d68cb4b4b3d0a2eac168d74278d8935f357fb10"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-macosx_10_15_universal2.whl", hash = "sha256:7b5dd25cf778d6c980d36c53081e843ea272afe7af4f0ad6394ae9513f94641b"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c4e6865fda62a2e460f823dce4d49ac2973a4459b8ff99eda5fdd6aaaebf46"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1810ddc623c8de2675d17405ce47057a9a2b92298e708ce4d9564847f5ad904"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:8259c3b856eb6deaa1f93dce893ab18d99d36d102da5612c8e97a4fb41b70ab1"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:5e7a48b2e505cde758fd51a601b5da0671fa98c9baee38362aaaa3ab2b930c28"}, + {file = "pulsar_client-3.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:ede264385d47257b2f2b08ecde9181ec5338bea5639cc543d1856f01736778d2"}, + {file = "pulsar_client-3.3.0-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:0f64c62746ccd5b65a0c505f5f40b9af1f147eb1fa2d8f9c90cd5c8b92dd8597"}, + {file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5b84a20c9012e3c4ef1b7085acd7467197118c090b378dec27d773fb79d91556"}, + {file = "pulsar_client-3.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c4e15fa696e275ccb66d0791fdc19c4dea0420d81349c8055e485b134125e14f"}, + {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:72cbb1bdcba2dd1265296b5ba65331622ee89c16db75edaad46dd7b90c6dd447"}, + {file = "pulsar_client-3.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:d54dd12955bf587dd46d9184444af5e853d9da2a14bbfb739ed2c7c3b78ce280"}, + {file = "pulsar_client-3.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:43f98afdf0334b2b957a4d96f97a1fe8a7f7fd1e2631d40c3f00b4162f396485"}, + {file = "pulsar_client-3.3.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:efe7c1e6a96daccc522c3567b6847ffa54c13e0f510d9a427b4aeff9fbebe54b"}, + {file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f28e94420090fceeb38e23fc744f3edf8710e48314ef5927d2b674a1d1e43ee0"}, + {file = "pulsar_client-3.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:42c8f3eaa98e2351805ecb6efb6d5fedf47a314a3ce6af0e05ea1449ea7244ed"}, + {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:5e69750f8ae57e55fddf97b459ce0d8b38b2bb85f464a71e871ee6a86d893be7"}, + {file = "pulsar_client-3.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:7e147e5ba460c1818bc05254279a885b4e552bcafb8961d40e31f98d5ff46628"}, + {file = "pulsar_client-3.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:694530af1d6c75fb81456fb509778c1868adee31e997ddece6e21678200182ea"}, +] + +[package.dependencies] +certifi = "*" + +[package.extras] +all = ["apache-bookkeeper-client (>=4.16.1)", "fastavro (==1.7.3)", "grpcio (>=1.8.2)", "prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"] +avro = ["fastavro (==1.7.3)"] +functions = ["apache-bookkeeper-client (>=4.16.1)", "grpcio (>=1.8.2)", "prometheus-client", "protobuf (>=3.6.1,<=3.20.3)", "ratelimit"] + [[package]] name = "pycodestyle" version = "2.9.1" @@ -3683,4 +3730,4 @@ sync = ["kafka-python", "requests"] [metadata] lock-version = "2.0" python-versions = ">=3.7, <3.12" -content-hash = "016644168e470e2904bc0a4109c0218c7b9ecf0890d17a32e28aa81bcda0e8d0" +content-hash = "ae3cd5c63201383530bf2daac37be0ef9399bf4384dbe42048c81e945b70642a" diff --git a/protocol b/protocol index 9b2f4a5..b5f6ebe 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit 9b2f4a5fb5694381924674d6c15cbead6a388d97 +Subproject commit b5f6ebe281b96d89968959f55baa3d9aa1bfecee diff --git a/pyproject.toml b/pyproject.toml index 734b058..8fc836f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -156,6 +156,7 @@ loguru = "^0.6.0" httpx = "^0.23.3" confluent-kafka = "^2.0.2" neo4j = "^5.9.0" +pulsar-client = "3.3.0" [tool.poetry.group.lint.dependencies] pylint = '2.13.9' diff --git a/skywalking/__init__.py b/skywalking/__init__.py index 1ed0c2b..ec1dc09 100644 --- a/skywalking/__init__.py +++ b/skywalking/__init__.py @@ -36,6 +36,8 @@ class Component(Enum): KafkaConsumer = 41 RabbitmqProducer = 52 RabbitmqConsumer = 53 + PulsarProducer = 73 + PulsarConsumer = 74 Elasticsearch = 47 HBase = 94 Neo4j = 112 diff --git a/skywalking/plugins/sw_pulsar.py b/skywalking/plugins/sw_pulsar.py new file mode 100644 index 0000000..33b24a3 --- /dev/null +++ b/skywalking/plugins/sw_pulsar.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from skywalking import Layer, Component +from skywalking.trace.carrier import Carrier +from skywalking.trace.context import get_context +from skywalking.trace.tags import TagMqTopic, TagMqBroker + +link_vector = ['https://github.com/apache/pulsar-client-python'] +support_matrix = { + 'pulsar-client': { + '>=3.8': ['3.3.0'] + } +} +note = """""" + + +def install(): + from pulsar import Producer + from pulsar import Consumer + from pulsar import Client + + __init = Client.__init__ + _send = Producer.send + _receive = Consumer.receive + _peer = '' + + def get_peer(): + return _peer + + def set_peer(value): + nonlocal _peer + _peer = value + + def _sw_init(self, service_url): + __init(self, service_url) + set_peer(service_url) + + def _sw_send_func(_send): + def _sw_send(this, content, + properties=None, + partition_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ): + topic = this._producer.topic().split('/')[-1] + with get_context().new_exit_span(op=f'Pulsar/Topic/{topic}/Producer', peer=get_peer(), + component=Component.PulsarProducer) as span: + span.tag(TagMqTopic(topic)) + span.tag(TagMqBroker(get_peer())) + span.layer = Layer.MQ + + carrier = span.inject() + if properties is None: + properties = {} + for item in carrier: + properties[item.key] = item.val + + return _send(this, content, properties=properties, partition_key=partition_key, + sequence_id=sequence_id, replication_clusters=replication_clusters, + disable_replication=disable_replication, event_timestamp=event_timestamp, + deliver_at=deliver_at, deliver_after=deliver_after) + + return _sw_send + + def _sw_receive_func(_receive): + def _sw_receive(this, timeout_millis=None): + res = _receive(this, timeout_millis=timeout_millis) + if res: + topic = res.topic_name().split('/')[-1] + properties = res.properties() + carrier = Carrier() + for item in carrier: + if item.key in properties.keys(): + val = res.properties().get(item.key) + if val is not None: + item.val = val + + with get_context().new_entry_span(op=f'Pulsar/Topic/{topic}/Consumer', carrier=carrier) as span: + span.tag(TagMqTopic(topic)) + span.tag(TagMqBroker(get_peer())) + span.layer = Layer.MQ + span.component = Component.PulsarConsumer + return res + + return _sw_receive + + Client.__init__ = _sw_init + Producer.send = _sw_send_func(_send) + Consumer.receive = _sw_receive_func(_receive) diff --git a/tests/plugin/data/sw_pulsar/__init__.py b/tests/plugin/data/sw_pulsar/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/data/sw_pulsar/docker-compose.yml b/tests/plugin/data/sw_pulsar/docker-compose.yml new file mode 100644 index 0000000..83197e2 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/docker-compose.yml @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + collector: + extends: + service: collector + file: ../../docker-compose.base.yml + + pulsar-server: + image: apachepulsar/pulsar:3.2.0 + hostname: pulsar-server + ports: + - 6650:6650 + - 8080:8080 + networks: + - beyond + command: ["bash","-c", "bin/pulsar standalone"] + healthcheck: + test: ["CMD", "nc", "-nz", "127.0.0.1", "8080"] + interval: 5s + timeout: 60s + retries: 120 + + producer: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9090:9090 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/producer.py'] + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + collector: + condition: service_healthy + pulsar-server: + condition: service_healthy + consumer: + condition: service_healthy + environment: + SW_AGENT_NAME: producer + SW_AGENT_LOGGING_LEVEL: INFO + + consumer: + extends: + service: agent + file: ../../docker-compose.base.yml + ports: + - 9091:9091 + volumes: + - .:/app + command: ['bash', '-c', 'pip install flask && pip install -r /app/requirements.txt && sw-python run python3 /app/services/consumer.py'] + healthcheck: + test: ["CMD", "bash", "-c", "ps -ef | grep /app/services/consumer | grep -v grep"] + interval: 5s + timeout: 60s + retries: 120 + depends_on: + collector: + condition: service_healthy + pulsar-server: + condition: service_healthy + environment: + SW_AGENT_NAME: consumer + SW_AGENT_LOGGING_LEVEL: INFO + +networks: + beyond: \ No newline at end of file diff --git a/tests/plugin/data/sw_pulsar/expected.data.yml b/tests/plugin/data/sw_pulsar/expected.data.yml new file mode 100644 index 0000000..6b61114 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/expected.data.yml @@ -0,0 +1,86 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + - serviceName: producer + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: Pulsar/Topic/sw-topic/Producer + parentSpanId: 0 + spanId: 1 + spanLayer: MQ + tags: + - key: mq.topic + value: sw-topic + - key: mq.broker + value: 'pulsar://pulsar-server:6650' + startTime: gt 0 + endTime: gt 0 + componentId: 73 + spanType: Exit + peer: pulsar://pulsar-server:6650 + skipAnalysis: false + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: GET + - key: http.url + value: http://0.0.0.0:9090/users + - key: http.status_code + value: '200' + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - serviceName: consumer + segmentSize: 1 + segments: + - segmentId: not null + spans: + - operationName: Pulsar/Topic/sw-topic/Consumer + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: MQ + tags: + - key: mq.topic + value: sw-topic + - key: mq.broker + value: 'pulsar://pulsar-server:6650' + refs: + - parentEndpoint: Pulsar/Topic/sw-topic/Producer + networkAddress: 'pulsar://pulsar-server:6650' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: producer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 74 + spanType: Entry + peer: '' + skipAnalysis: false diff --git a/tests/plugin/data/sw_pulsar/services/__init__.py b/tests/plugin/data/sw_pulsar/services/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/services/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/data/sw_pulsar/services/consumer.py b/tests/plugin/data/sw_pulsar/services/consumer.py new file mode 100644 index 0000000..fc444b1 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/services/consumer.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if __name__ == '__main__': + import pulsar + + client = pulsar.Client(service_url='pulsar://pulsar-server:6650') + consumer = client.subscribe('sw-topic', 'sw-subscription') + + while True: + try: + msg = consumer.receive() + print('Received message = ', str(msg.data().decode('utf-8')), '|message_id = ', msg.message_id()) + consumer.acknowledge(msg) + except pulsar.Interrupted: + break + + client.close() diff --git a/tests/plugin/data/sw_pulsar/services/producer.py b/tests/plugin/data/sw_pulsar/services/producer.py new file mode 100644 index 0000000..9505f82 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/services/producer.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +if __name__ == '__main__': + import pulsar + from flask import Flask, jsonify + from pulsar import BatchingType + + app = Flask(__name__) + client = pulsar.Client(service_url='pulsar://pulsar-server:6650') + producer = client.create_producer( + 'sw-topic', + block_if_queue_full=True, + batching_enabled=True, + batching_max_publish_delay_ms=10, + batching_type=BatchingType.KeyBased + ) + + + @app.route('/users', methods=['POST', 'GET']) + def application(): + producer.send('I love skywalking 3 thousand'.encode('utf-8'), None) + producer.flush() + producer.close() + return jsonify({'song': 'Despacito', 'artist': 'Luis Fonsi'}) + + PORT = 9090 + app.run(host='0.0.0.0', port=PORT, debug=True) diff --git a/tests/plugin/data/sw_pulsar/test_pulsar.py b/tests/plugin/data/sw_pulsar/test_pulsar.py new file mode 100644 index 0000000..59767e8 --- /dev/null +++ b/tests/plugin/data/sw_pulsar/test_pulsar.py @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import Callable + +import pytest +import requests + +from skywalking.plugins.sw_pulsar import support_matrix +from tests.orchestrator import get_test_vector +from tests.plugin.base import TestPluginBase + + +@pytest.fixture +def prepare(): + # type: () -> Callable + return lambda *_: requests.get('http://0.0.0.0:9090/users', timeout=5) + + +class TestPlugin(TestPluginBase): + @pytest.mark.parametrize('version', get_test_vector(lib_name='pulsar-client', support_matrix=support_matrix)) + def test_plugin(self, docker_compose, version): + self.validate()