This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 03aedc7 [Python Schema] Fix python schema array map with record (#11530) 03aedc7 is described below commit 03aedc7cd708e40781d6673114ce1691a7510a14 Author: ran <gaoran...@126.com> AuthorDate: Tue Aug 3 00:38:14 2021 +0800 [Python Schema] Fix python schema array map with record (#11530) * fix Python schema type Array and Map work with Record * fix test --- .../python/pulsar/schema/definition.py | 21 ++++++- .../python/pulsar/schema/schema_avro.py | 7 +++ pulsar-client-cpp/python/schema_test.py | 70 ++++++++++++++++++++-- 3 files changed, 93 insertions(+), 5 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index 3b946b8..56385957 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -67,8 +67,24 @@ class Record(with_metaclass(RecordMeta, object)): if isinstance(value, Record) and isinstance(kwargs[k], dict): # Use dict init Record object copied = copy.copy(value) - copied.__init__(decode=True, **kwargs[k]) + copied.__init__(**kwargs[k]) self.__setattr__(k, copied) + elif isinstance(value, Array) and isinstance(kwargs[k], list) and len(kwargs[k]) > 0 \ + and isinstance(value.array_type, Record) and isinstance(kwargs[k][0], dict): + arr = [] + for item in kwargs[k]: + copied = copy.copy(value.array_type) + copied.__init__(**item) + arr.append(copied) + self.__setattr__(k, arr) + elif isinstance(value, Map) and isinstance(kwargs[k], dict) and len(kwargs[k]) > 0 \ + and isinstance(value.value_type, Record) and isinstance(list(kwargs[k].values())[0], dict): + dic = {} + for mapKey, mapValue in kwargs[k].items(): + copied = copy.copy(value.value_type) + copied.__init__(**mapValue) + dic[mapKey] = copied + self.__setattr__(k, dic) else: # Value was overridden at constructor self.__setattr__(k, kwargs[k]) @@ -129,6 +145,9 @@ class Record(with_metaclass(RecordMeta, object)): def type(self): return str(self.__class__.__name__) + def python_type(self): + return self.__class__ + def validate_type(self, name, val): if not isinstance(val, self.__class__): raise TypeError("Invalid type '%s' for sub-record field '%s'. Expected: %s" % ( diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py index fc9e6a6..d2b57cb 100644 --- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py +++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py @@ -42,6 +42,13 @@ if HAS_AVRO: return x.name elif isinstance(x, Record): return self.encode_dict(x.__dict__) + elif isinstance(x, list): + arr = [] + for item in x: + arr.append(self._get_serialized_value(item)) + return arr + elif isinstance(x, dict): + return self.encode_dict(x) else: return x diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py index 8cf6ff4..49b6c42 100755 --- a/pulsar-client-cpp/python/schema_test.py +++ b/pulsar-client-cpp/python/schema_test.py @@ -460,6 +460,9 @@ class SchemaTest(TestCase): msg = consumer.receive() self.assertEqual(r, msg.value()) + + producer.close() + consumer.close() client.close() def test_string_schema(self): @@ -562,6 +565,9 @@ class SchemaTest(TestCase): msg = consumer.receive() self.assertEqual(r, msg.value()) + + producer.close() + consumer.close() client.close() def test_json_enum(self): @@ -863,17 +869,38 @@ class SchemaTest(TestCase): nb2 = Boolean() nc2 = NestedObj1() + class NestedObj3(Record): + na3 = Integer() + + class NestedObj4(Record): + na4 = String() + nb4 = Integer() + class ComplexRecord(Record): a = Integer() b = Integer() nested = NestedObj2() + mapNested = Map(NestedObj3()) + arrayNested = Array(NestedObj4()) + print('complex schema: ', ComplexRecord.schema()) self.assertEqual(ComplexRecord.schema(), { "name": "ComplexRecord", "type": "record", "fields": [ {"name": "a", "type": ["null", "int"]}, + {'name': 'arrayNested', 'type': ['null', + {'type': 'array', 'items': {'name': 'NestedObj4', 'type': 'record', 'fields': [ + {'name': 'na4', 'type': ['null', 'string']}, + {'name': 'nb4', 'type': ['null', 'int']} + ]}} + ]}, {"name": "b", "type": ["null", "int"]}, + {'name': 'mapNested', 'type': ['null', {'type': 'map', 'values': + {'name': 'NestedObj3', 'type': 'record', 'fields': [ + {'name': 'na3', 'type': ['null', 'int']} + ]}} + ]}, {"name": "nested", "type": ['null', {'name': 'NestedObj2', 'type': 'record', 'fields': [ {'name': 'na2', 'type': ['null', 'int']}, {'name': 'nb2', 'type': ['null', 'boolean']}, @@ -892,7 +919,14 @@ class SchemaTest(TestCase): nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5) nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1) - r = ComplexRecord(a=1, b=2, nested=nested_obj2) + r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={ + 'a': NestedObj3(na3=1), + 'b': NestedObj3(na3=2), + 'c': NestedObj3(na3=3) + }, arrayNested=[ + NestedObj4(na4='value na4 1', nb4=100), + NestedObj4(na4='value na4 2', nb4=200) + ]) data_encode = data_schema.encode(r) data_decode = data_schema.decode(data_encode) @@ -904,6 +938,13 @@ class SchemaTest(TestCase): self.assertEqual(data_decode.nested.nb2, True) self.assertEqual(data_decode.nested.nc2.na1, 'na1 value') self.assertEqual(data_decode.nested.nc2.nb1, 20.5) + self.assertEqual(data_decode.mapNested['a'].na3, 1) + self.assertEqual(data_decode.mapNested['b'].na3, 2) + self.assertEqual(data_decode.mapNested['c'].na3, 3) + self.assertEqual(data_decode.arrayNested[0].na4, 'value na4 1') + self.assertEqual(data_decode.arrayNested[0].nb4, 100) + self.assertEqual(data_decode.arrayNested[1].na4, 'value na4 2') + self.assertEqual(data_decode.arrayNested[1].nb4, 200) print('Encode and decode complex schema finish. schema_type: ', schema_type) encode_and_decode('avro') @@ -919,10 +960,19 @@ class SchemaTest(TestCase): nb2 = Boolean() nc2 = NestedObj1() + class NestedObj3(Record): + na3 = Integer() + + class NestedObj4(Record): + na4 = String() + nb4 = Integer() + class ComplexRecord(Record): a = Integer() b = Integer() nested = NestedObj2() + mapNested = Map(NestedObj3()) + arrayNested = Array(NestedObj4()) client = pulsar.Client(self.serviceUrl) @@ -941,7 +991,14 @@ class SchemaTest(TestCase): nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5) nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1) - r = ComplexRecord(a=1, b=2, nested=nested_obj2) + r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={ + 'a': NestedObj3(na3=1), + 'b': NestedObj3(na3=2), + 'c': NestedObj3(na3=3) + }, arrayNested=[ + NestedObj4(na4='value na4 1', nb4=100), + NestedObj4(na4='value na4 2', nb4=200) + ]) producer.send(r) msg = consumer.receive() @@ -954,9 +1011,14 @@ class SchemaTest(TestCase): self.assertEqual(value.nested.nb2, True) self.assertEqual(value.nested.nc2.na1, 'na1 value') self.assertEqual(value.nested.nc2.nb1, 20.5) + self.assertEqual(value.mapNested['a'].na3, 1) + self.assertEqual(value.mapNested['b'].na3, 2) + self.assertEqual(value.mapNested['c'].na3, 3) + self.assertEqual(value.arrayNested[0].na4, 'value na4 1') + self.assertEqual(value.arrayNested[0].nb4, 100) + self.assertEqual(value.arrayNested[1].na4, 'value na4 2') + self.assertEqual(value.arrayNested[1].nb4, 200) - producer.close() - consumer.close() print('Produce and consume complex schema data finish. schema_type', schema_type) produce_consume_test('avro')