This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c1e7f940d9d6a408a3a2e0745eb4b6d943bd3920 Author: ran <gaoran...@126.com> AuthorDate: Sat Oct 30 08:17:45 2021 +0800 [Python Client] Python client support using custom Avro schema definition (#12516) ### Motivation Currently, the Python client didn't support using schema definition to generate `AvroSchema`, so users couldn't use the schema definition file in the Python client. ### Modifications Add a new init-param `schema_definition` for `AvroSchema` to support initializing the `AvroSchema` by an Avro schema definition. ``` class AvroSchema(Schema): def __init__(self, record_cls, schema_definition=None): if record_cls is None and schema_definition is None: raise AssertionError("The param record_cls and schema_definition shouldn't be both None.") if record_cls is not None: self._schema = record_cls.schema() else: self._schema = schema_definition super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO') ``` ### How to use Assume that there is a company Avro schema definition file `company.avsc` like this. ``` { "doc": "this is doc", "namespace": "example.avro", "type": "record", "name": "Company", "fields": [ {"name": "name", "type": ["null", "string"]}, {"name": "address", "type": ["null", "string"]}, {"name": "employees", "type": ["null", {"type": "array", "items": { "type": "record", "name": "Employee", "fields": [ {"name": "name", "type": ["null", "string"]}, {"name": "age", "type": ["null", "int"]} ] }}]}, {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]} ] } ``` Users could load schema from file by `avro.schema` or `fastavro.schema` > refer to [load_schema](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema) or [Avro Schema](http://avro.apache.org/docs/current/gettingstartedpython.html) ``` schema_definition = load_schema("examples/company.avsc") # schema_definition = avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json() avro_schema = AvroSchema(None, schema_definition=schema_definition) producer = client.create_producer( topic=topic, schema=avro_schema) consumer = client.subscribe(topic, 'test', schema=avro_schema) company = { "name": "company-name" + str(i), "address": 'xxx road xxx street ' + str(i), "employees": [ {"name": "user" + str(i), "age": 20 + i}, {"name": "user" + str(i), "age": 30 + i}, {"name": "user" + str(i), "age": 35 + i}, ], "labels": { "industry": "software" + str(i), "scale": ">100", "funds": "1000000.0" } } producer.send(company) msg = consumer.receive() # Users could get a dict object by `value()` method. msg.value() ``` --- pulsar-client-cpp/python/examples/company.avsc | 19 +++ .../python/pulsar/schema/schema_avro.py | 29 +++-- pulsar-client-cpp/python/schema_test.py | 128 ++++++++++++++++++++- 3 files changed, 162 insertions(+), 14 deletions(-) diff --git a/pulsar-client-cpp/python/examples/company.avsc b/pulsar-client-cpp/python/examples/company.avsc new file mode 100644 index 0000000..cdb595f --- /dev/null +++ b/pulsar-client-cpp/python/examples/company.avsc @@ -0,0 +1,19 @@ +{ + "doc": "this is doc", + "namespace": "example.avro", + "type": "record", + "name": "Company", + "fields": [ + {"name": "name", "type": ["null", "string"]}, + {"name": "address", "type": ["null", "string"]}, + {"name": "employees", "type": ["null", {"type": "array", "items": { + "type": "record", + "name": "Employee", + "fields": [ + {"name": "name", "type": ["null", "string"]}, + {"name": "age", "type": ["null", "int"]} + ] + }}]}, + {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]} + ] +} \ No newline at end of file diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py index e76fc51..5861505 100644 --- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py +++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py @@ -32,10 +32,15 @@ except ModuleNotFoundError: if HAS_AVRO: class AvroSchema(Schema): - def __init__(self, record_cls): - super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, - record_cls.schema(), 'AVRO') - self._schema = record_cls.schema() + def __init__(self, record_cls, schema_definition=None): + if record_cls is None and schema_definition is None: + raise AssertionError("The param record_cls and schema_definition shouldn't be both None.") + + if record_cls is not None: + self._schema = record_cls.schema() + else: + self._schema = schema_definition + super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, self._schema, 'AVRO') def _get_serialized_value(self, x): if isinstance(x, enum.Enum): @@ -53,9 +58,14 @@ if HAS_AVRO: return x def encode(self, obj): - self._validate_object_type(obj) buffer = io.BytesIO() - m = self.encode_dict(obj.__dict__) + m = obj + if self._record_cls is not None: + self._validate_object_type(obj) + m = self.encode_dict(obj.__dict__) + elif not isinstance(obj, dict): + raise ValueError('If using the custom schema, the record data should be dict type.') + fastavro.schemaless_writer(buffer, self._schema, m) return buffer.getvalue() @@ -68,11 +78,14 @@ if HAS_AVRO: def decode(self, data): buffer = io.BytesIO(data) d = fastavro.schemaless_reader(buffer, self._schema) - return self._record_cls(**d) + if self._record_cls is not None: + return self._record_cls(**d) + else: + return d else: class AvroSchema(Schema): - def __init__(self, _record_cls): + def __init__(self, _record_cls, _schema_definition): raise Exception("Avro library support was not found. Make sure to install Pulsar client " + "with Avro support: pip3 install 'pulsar-client[avro]'") diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py index 7adbcbe..d2554da 100755 --- a/pulsar-client-cpp/python/schema_test.py +++ b/pulsar-client-cpp/python/schema_test.py @@ -25,6 +25,7 @@ import pulsar from pulsar.schema import * from enum import Enum import json +from fastavro.schema import load_schema class SchemaTest(TestCase): @@ -1145,12 +1146,127 @@ class SchemaTest(TestCase): client.close() - def test(self): - class NamespaceDemo(Record): - _namespace = 'xxx.xxx.xxx' - x = String() - y = Integer() - print('schema: ', NamespaceDemo.schema()) + def custom_schema_test(self): + + def encode_and_decode(schema_definition): + avro_schema = AvroSchema(None, schema_definition=schema_definition) + + company = { + "name": "company-name", + "address": 'xxx road xxx street', + "employees": [ + {"name": "user1", "age": 25}, + {"name": "user2", "age": 30}, + {"name": "user3", "age": 35}, + ], + "labels": { + "industry": "software", + "scale": ">100", + "funds": "1000000.0" + } + } + data = avro_schema.encode(company) + company_decode = avro_schema.decode(data) + self.assertEqual(company, company_decode) + + schema_definition = { + 'doc': 'this is doc', + 'namespace': 'example.avro', + 'type': 'record', + 'name': 'Company', + 'fields': [ + {'name': 'name', 'type': ['null', 'string']}, + {'name': 'address', 'type': ['null', 'string']}, + {'name': 'employees', 'type': ['null', {'type': 'array', 'items': { + 'type': 'record', + 'name': 'Employee', + 'fields': [ + {'name': 'name', 'type': ['null', 'string']}, + {'name': 'age', 'type': ['null', 'int']} + ] + }}]}, + {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]} + ] + } + encode_and_decode(schema_definition) + # Users could load schema from file by `fastavro.schema` + # Or use `avro.schema` like this `avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()` + encode_and_decode(load_schema("examples/company.avsc")) + + def custom_schema_produce_and_consume_test(self): + client = pulsar.Client(self.serviceUrl) + + def produce_and_consume(topic, schema_definition): + print('custom schema produce and consume test topic - ', topic) + example_avro_schema = AvroSchema(None, schema_definition=schema_definition) + + producer = client.create_producer( + topic=topic, + schema=example_avro_schema) + consumer = client.subscribe(topic, 'test', schema=example_avro_schema) + + for i in range(0, 10): + company = { + "name": "company-name" + str(i), + "address": 'xxx road xxx street ' + str(i), + "employees": [ + {"name": "user" + str(i), "age": 20 + i}, + {"name": "user" + str(i), "age": 30 + i}, + {"name": "user" + str(i), "age": 35 + i}, + ], + "labels": { + "industry": "software" + str(i), + "scale": ">100", + "funds": "1000000.0" + } + } + producer.send(company) + + for i in range(0, 10): + msg = consumer.receive() + company = { + "name": "company-name" + str(i), + "address": 'xxx road xxx street ' + str(i), + "employees": [ + {"name": "user" + str(i), "age": 20 + i}, + {"name": "user" + str(i), "age": 30 + i}, + {"name": "user" + str(i), "age": 35 + i}, + ], + "labels": { + "industry": "software" + str(i), + "scale": ">100", + "funds": "1000000.0" + } + } + self.assertEqual(msg.value(), company) + consumer.acknowledge(msg) + + consumer.close() + producer.close() + + schema_definition = { + 'doc': 'this is doc', + 'namespace': 'example.avro', + 'type': 'record', + 'name': 'Company', + 'fields': [ + {'name': 'name', 'type': ['null', 'string']}, + {'name': 'address', 'type': ['null', 'string']}, + {'name': 'employees', 'type': ['null', {'type': 'array', 'items': { + 'type': 'record', + 'name': 'Employee', + 'fields': [ + {'name': 'name', 'type': ['null', 'string']}, + {'name': 'age', 'type': ['null', 'int']} + ] + }}]}, + {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]} + ] + } + produce_and_consume('custom-schema-test-1', schema_definition=schema_definition) + produce_and_consume('custom-schema-test-2', schema_definition=load_schema("examples/company.avsc")) + + client.close() if __name__ == '__main__': main()