This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c1e7f940d9d6a408a3a2e0745eb4b6d943bd3920
Author: ran <gaoran...@126.com>
AuthorDate: Sat Oct 30 08:17:45 2021 +0800

    [Python Client] Python client support using custom Avro schema definition 
(#12516)
    
    ### Motivation
    
    Currently, the Python client didn't support using schema definition to 
generate `AvroSchema`, so users couldn't use the schema definition file in the 
Python client.
    
    ### Modifications
    
    Add a new init-param `schema_definition` for `AvroSchema`  to support 
initializing the `AvroSchema` by an Avro schema definition.
    
    ```
    class AvroSchema(Schema):
        def __init__(self, record_cls, schema_definition=None):
            if record_cls is None and schema_definition is None:
                raise AssertionError("The param record_cls and 
schema_definition shouldn't be both None.")
    
            if record_cls is not None:
                self._schema = record_cls.schema()
            else:
                self._schema = schema_definition
            super(AvroSchema, self).__init__(record_cls, 
_pulsar.SchemaType.AVRO, self._schema, 'AVRO')
    ```
    
    
    ### How to use
    
    Assume that there is a company Avro schema definition file `company.avsc` 
like this.
    ```
    {
        "doc": "this is doc",
        "namespace": "example.avro",
        "type": "record",
        "name": "Company",
        "fields": [
            {"name": "name", "type": ["null", "string"]},
            {"name": "address", "type": ["null", "string"]},
            {"name": "employees", "type": ["null", {"type": "array", "items": {
                "type": "record",
                "name": "Employee",
                "fields": [
                    {"name": "name", "type": ["null", "string"]},
                    {"name": "age", "type": ["null", "int"]}
                ]
            }}]},
            {"name": "labels", "type": ["null", {"type": "map", "values": 
"string"}]}
        ]
    }
    ```
    
    Users could load schema from file by `avro.schema` or `fastavro.schema`
    > refer to 
[load_schema](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema)
 or [Avro Schema](http://avro.apache.org/docs/current/gettingstartedpython.html)
    ```
    schema_definition = load_schema("examples/company.avsc")
    # schema_definition = avro.schema.parse(open("examples/company.avsc", 
"rb").read()).to_json()
    avro_schema = AvroSchema(None, schema_definition=schema_definition)
    
    producer = client.create_producer(
        topic=topic,
        schema=avro_schema)
    consumer = client.subscribe(topic, 'test', schema=avro_schema)
    
    company = {
        "name": "company-name" + str(i),
        "address": 'xxx road xxx street ' + str(i),
        "employees": [
            {"name": "user" + str(i), "age": 20 + i},
            {"name": "user" + str(i), "age": 30 + i},
            {"name": "user" + str(i), "age": 35 + i},
        ],
        "labels": {
            "industry": "software" + str(i),
            "scale": ">100",
            "funds": "1000000.0"
        }
    }
    producer.send(company)
    
    msg = consumer.receive()
    # Users could get a dict object by `value()` method.
    msg.value()
    ```
---
 pulsar-client-cpp/python/examples/company.avsc     |  19 +++
 .../python/pulsar/schema/schema_avro.py            |  29 +++--
 pulsar-client-cpp/python/schema_test.py            | 128 ++++++++++++++++++++-
 3 files changed, 162 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/python/examples/company.avsc 
b/pulsar-client-cpp/python/examples/company.avsc
new file mode 100644
index 0000000..cdb595f
--- /dev/null
+++ b/pulsar-client-cpp/python/examples/company.avsc
@@ -0,0 +1,19 @@
+{
+    "doc": "this is doc",
+    "namespace": "example.avro",
+    "type": "record",
+    "name": "Company",
+    "fields": [
+        {"name": "name", "type": ["null", "string"]},
+        {"name": "address", "type": ["null", "string"]},
+        {"name": "employees", "type": ["null", {"type": "array", "items": {
+            "type": "record",
+            "name": "Employee",
+            "fields": [
+                {"name": "name", "type": ["null", "string"]},
+                {"name": "age", "type": ["null", "int"]}
+            ]
+        }}]},
+        {"name": "labels", "type": ["null", {"type": "map", "values": 
"string"}]}
+    ]
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py 
b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index e76fc51..5861505 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -32,10 +32,15 @@ except ModuleNotFoundError:
 
 if HAS_AVRO:
     class AvroSchema(Schema):
-        def __init__(self, record_cls):
-            super(AvroSchema, self).__init__(record_cls, 
_pulsar.SchemaType.AVRO,
-                                             record_cls.schema(), 'AVRO')
-            self._schema = record_cls.schema()
+        def __init__(self, record_cls, schema_definition=None):
+            if record_cls is None and schema_definition is None:
+                raise AssertionError("The param record_cls and 
schema_definition shouldn't be both None.")
+
+            if record_cls is not None:
+                self._schema = record_cls.schema()
+            else:
+                self._schema = schema_definition
+            super(AvroSchema, self).__init__(record_cls, 
_pulsar.SchemaType.AVRO, self._schema, 'AVRO')
 
         def _get_serialized_value(self, x):
             if isinstance(x, enum.Enum):
@@ -53,9 +58,14 @@ if HAS_AVRO:
                 return x
 
         def encode(self, obj):
-            self._validate_object_type(obj)
             buffer = io.BytesIO()
-            m = self.encode_dict(obj.__dict__)
+            m = obj
+            if self._record_cls is not None:
+                self._validate_object_type(obj)
+                m = self.encode_dict(obj.__dict__)
+            elif not isinstance(obj, dict):
+                raise ValueError('If using the custom schema, the record data 
should be dict type.')
+
             fastavro.schemaless_writer(buffer, self._schema, m)
             return buffer.getvalue()
 
@@ -68,11 +78,14 @@ if HAS_AVRO:
         def decode(self, data):
             buffer = io.BytesIO(data)
             d = fastavro.schemaless_reader(buffer, self._schema)
-            return self._record_cls(**d)
+            if self._record_cls is not None:
+                return self._record_cls(**d)
+            else:
+                return d
 
 else:
     class AvroSchema(Schema):
-        def __init__(self, _record_cls):
+        def __init__(self, _record_cls, _schema_definition):
             raise Exception("Avro library support was not found. Make sure to 
install Pulsar client " +
                             "with Avro support: pip3 install 
'pulsar-client[avro]'")
 
diff --git a/pulsar-client-cpp/python/schema_test.py 
b/pulsar-client-cpp/python/schema_test.py
index 7adbcbe..d2554da 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -25,6 +25,7 @@ import pulsar
 from pulsar.schema import *
 from enum import Enum
 import json
+from fastavro.schema import load_schema
 
 
 class SchemaTest(TestCase):
@@ -1145,12 +1146,127 @@ class SchemaTest(TestCase):
 
         client.close()
 
-    def test(self):
-        class NamespaceDemo(Record):
-            _namespace = 'xxx.xxx.xxx'
-            x = String()
-            y = Integer()
-        print('schema: ', NamespaceDemo.schema())
+    def custom_schema_test(self):
+
+        def encode_and_decode(schema_definition):
+            avro_schema = AvroSchema(None, schema_definition=schema_definition)
+
+            company = {
+                "name": "company-name",
+                "address": 'xxx road xxx street',
+                "employees": [
+                    {"name": "user1", "age": 25},
+                    {"name": "user2", "age": 30},
+                    {"name": "user3", "age": 35},
+                ],
+                "labels": {
+                    "industry": "software",
+                    "scale": ">100",
+                    "funds": "1000000.0"
+                }
+            }
+            data = avro_schema.encode(company)
+            company_decode = avro_schema.decode(data)
+            self.assertEqual(company, company_decode)
+
+        schema_definition = {
+            'doc': 'this is doc',
+            'namespace': 'example.avro',
+            'type': 'record',
+            'name': 'Company',
+            'fields': [
+                {'name': 'name', 'type': ['null', 'string']},
+                {'name': 'address', 'type': ['null', 'string']},
+                {'name': 'employees', 'type': ['null', {'type': 'array', 
'items': {
+                    'type': 'record',
+                    'name': 'Employee',
+                    'fields': [
+                        {'name': 'name', 'type': ['null', 'string']},
+                        {'name': 'age', 'type': ['null', 'int']}
+                    ]
+                }}]},
+                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 
'string'}]}
+            ]
+        }
+        encode_and_decode(schema_definition)
+        # Users could load schema from file by `fastavro.schema`
+        # Or use `avro.schema` like this 
`avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()`
+        encode_and_decode(load_schema("examples/company.avsc"))
+
+    def custom_schema_produce_and_consume_test(self):
+        client = pulsar.Client(self.serviceUrl)
+
+        def produce_and_consume(topic, schema_definition):
+            print('custom schema produce and consume test topic - ', topic)
+            example_avro_schema = AvroSchema(None, 
schema_definition=schema_definition)
+
+            producer = client.create_producer(
+                topic=topic,
+                schema=example_avro_schema)
+            consumer = client.subscribe(topic, 'test', 
schema=example_avro_schema)
+
+            for i in range(0, 10):
+                company = {
+                    "name": "company-name" + str(i),
+                    "address": 'xxx road xxx street ' + str(i),
+                    "employees": [
+                        {"name": "user" + str(i), "age": 20 + i},
+                        {"name": "user" + str(i), "age": 30 + i},
+                        {"name": "user" + str(i), "age": 35 + i},
+                    ],
+                    "labels": {
+                        "industry": "software" + str(i),
+                        "scale": ">100",
+                        "funds": "1000000.0"
+                    }
+                }
+                producer.send(company)
+
+            for i in range(0, 10):
+                msg = consumer.receive()
+                company = {
+                    "name": "company-name" + str(i),
+                    "address": 'xxx road xxx street ' + str(i),
+                    "employees": [
+                        {"name": "user" + str(i), "age": 20 + i},
+                        {"name": "user" + str(i), "age": 30 + i},
+                        {"name": "user" + str(i), "age": 35 + i},
+                    ],
+                    "labels": {
+                        "industry": "software" + str(i),
+                        "scale": ">100",
+                        "funds": "1000000.0"
+                    }
+                }
+                self.assertEqual(msg.value(), company)
+                consumer.acknowledge(msg)
+
+            consumer.close()
+            producer.close()
+
+        schema_definition = {
+            'doc': 'this is doc',
+            'namespace': 'example.avro',
+            'type': 'record',
+            'name': 'Company',
+            'fields': [
+                {'name': 'name', 'type': ['null', 'string']},
+                {'name': 'address', 'type': ['null', 'string']},
+                {'name': 'employees', 'type': ['null', {'type': 'array', 
'items': {
+                    'type': 'record',
+                    'name': 'Employee',
+                    'fields': [
+                        {'name': 'name', 'type': ['null', 'string']},
+                        {'name': 'age', 'type': ['null', 'int']}
+                    ]
+                }}]},
+                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 
'string'}]}
+            ]
+        }
+        produce_and_consume('custom-schema-test-1', 
schema_definition=schema_definition)
+        produce_and_consume('custom-schema-test-2', 
schema_definition=load_schema("examples/company.avsc"))
+
+        client.close()
 
 if __name__ == '__main__':
     main()

Reply via email to