This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e7389ed [Python Schema] Python schema support custom Avro configurations for Enum type (#12642) e7389ed is described below commit e7389ed965cb712058c4ed1e8931d7563ca10bc8 Author: ran <gaoran...@126.com> AuthorDate: Sun Nov 7 21:24:33 2021 +0800 [Python Schema] Python schema support custom Avro configurations for Enum type (#12642) ### Motivation Currently, the Python client didn't support setting configurations `required`, `default`, `required_default` for Enum type in Record. ### Modifications Modify the `_Enum` class to `CustomEnum` class, the `_Enum` wasn't exposed to users, the new class `CustomEnum` will be exposed to users, they could set Avro definition configurations `required`, `default`, `required_default`. ### How to use ``` class Color(Enum): red = 1 green = 2 blue = 3 class NestedObj(Record): a = Integer() color = CustomEnum(Color, required_default=True, default=Color.blue) ``` The schema definition will be like this ``` { 'type': 'record', 'name': 'NestedObj', 'fields': [ {'name': 'a', 'type': ['null', 'int']}, {'name': 'color', 'default': 'blue', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': ['red', 'green', 'blue']}]} ] } ``` The old way of use will also be preserved. This feature could work well with Java client. --- pulsar-client-cpp/python/examples/company.avsc | 4 +- pulsar-client-cpp/python/pulsar/schema/__init__.py | 2 +- .../python/pulsar/schema/definition.py | 18 ++-- pulsar-client-cpp/python/schema_test.py | 118 ++++++++++----------- 4 files changed, 74 insertions(+), 68 deletions(-) diff --git a/pulsar-client-cpp/python/examples/company.avsc b/pulsar-client-cpp/python/examples/company.avsc index cdb595f..5fb1860 100644 --- a/pulsar-client-cpp/python/examples/company.avsc +++ b/pulsar-client-cpp/python/examples/company.avsc @@ -14,6 +14,8 @@ {"name": "age", "type": ["null", "int"]} ] }}]}, - {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]} + {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}, + {"name": "companyType", "type": ["null", {"type": "enum", "name": "CompanyType", "symbols": + ["companyType1", "companyType2", "companyType3"]}]} ] } \ No newline at end of file diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py index 150629d..efa6806 100644 --- a/pulsar-client-cpp/python/pulsar/schema/__init__.py +++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py @@ -18,7 +18,7 @@ # from .definition import Record, Field, Null, Boolean, Integer, Long, \ - Float, Double, Bytes, String, Array, Map + Float, Double, Bytes, String, Array, Map, CustomEnum from .schema import Schema, BytesSchema, StringSchema, JsonSchema from .schema_avro import AvroSchema diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index fd778f3..9b6c861 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -44,8 +44,7 @@ class RecordMeta(type): fields = OrderedDict() for name, value in dct.items(): if issubclass(type(value), EnumMeta): - # Wrap Python enums - value = _Enum(value) + value = CustomEnum(value) elif type(value) == RecordMeta: # We expect an instance of a record rather than the class itself value = value() @@ -125,6 +124,12 @@ class Record(with_metaclass(RecordMeta, object)): schema['namespace'] = cls._avro_namespace schema['fields'] = [] + def get_filed_default_value(value): + if isinstance(value, Enum): + return value.name + else: + return value + if cls._sorted_fields: fields = sorted(cls._fields.keys()) else: @@ -135,7 +140,7 @@ class Record(with_metaclass(RecordMeta, object)): if field._required else ['null', field.schema_info(defined_names)] schema['fields'].append({ 'name': name, - 'default': field.default(), + 'default': get_filed_default_value(field.default()), 'type': field_type }) if field.required_default() else schema['fields'].append({ 'name': name, @@ -360,15 +365,16 @@ class String(Field): # Complex types -class _Enum(Field): - def __init__(self, enum_type): + +class CustomEnum(Field): + def __init__(self, enum_type, default=None, required=False, required_default=False): if not issubclass(enum_type, Enum): raise Exception(enum_type + " is not a valid Enum type") self.enum_type = enum_type self.values = {} for x in enum_type.__members__.values(): self.values[x.value] = x - super(_Enum, self).__init__() + super(CustomEnum, self).__init__(default, required, required_default) def type(self): return 'enum' diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py index d2554da..077f2bb 100755 --- a/pulsar-client-cpp/python/schema_test.py +++ b/pulsar-client-cpp/python/schema_test.py @@ -49,6 +49,7 @@ class SchemaTest(TestCase): g = Double() h = Bytes() i = Map(String()) + j = CustomEnum(Color) fastavro.parse_schema(Example.schema()) self.assertEqual(Example.schema(), { @@ -74,16 +75,23 @@ class SchemaTest(TestCase): {"name": "i", "type": ["null", { "type": "map", "values": "string"}] - }, + }, + {"name": "j", "type": ["null", "Color"]} ] }) def test_complex(self): + class Color(Enum): + red = 1 + green = 2 + blue = 3 + class MySubRecord(Record): _sorted_fields = True x = Integer() y = Long() z = String() + color = CustomEnum(Color) class Example(Record): _sorted_fields = True @@ -101,9 +109,12 @@ class SchemaTest(TestCase): "type": ["null", { "name": "MySubRecord", "type": "record", - "fields": [{"name": "x", "type": ["null", "int"]}, - {"name": "y", "type": ["null", "long"]}, - {"name": "z", "type": ["null", "string"]}] + "fields": [ + {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': + ['red', 'green', 'blue']}]}, + {"name": "x", "type": ["null", "int"]}, + {"name": "y", "type": ["null", "long"]}, + {"name": "z", "type": ["null", "string"]}] }] }, {"name": "sub2", @@ -630,6 +641,8 @@ class SchemaTest(TestCase): class Example(Record): name = String() v = MyEnum + w = CustomEnum(MyEnum) + x = CustomEnum(MyEnum, required=True, default=MyEnum.A, required_default=True) topic = 'my-json-enum-topic' @@ -641,13 +654,15 @@ class SchemaTest(TestCase): consumer = client.subscribe(topic, 'test', schema=JsonSchema(Example)) - r = Example(name='test', v=MyEnum.C) + r = Example(name='test', v=MyEnum.C, w=MyEnum.B) producer.send(r) msg = consumer.receive() self.assertEqual('test', msg.value().name) self.assertEqual(MyEnum.C, MyEnum(msg.value().v)) + self.assertEqual(MyEnum.B, MyEnum(msg.value().w)) + self.assertEqual(MyEnum.A, MyEnum(msg.value().x)) client.close() def test_avro_enum(self): @@ -659,6 +674,8 @@ class SchemaTest(TestCase): class Example(Record): name = String() v = MyEnum + w = CustomEnum(MyEnum) + x = CustomEnum(MyEnum, required=True, default=MyEnum.B, required_default=True) topic = 'my-avro-enum-topic' @@ -670,12 +687,14 @@ class SchemaTest(TestCase): consumer = client.subscribe(topic, 'test', schema=AvroSchema(Example)) - r = Example(name='test', v=MyEnum.C) + r = Example(name='test', v=MyEnum.C, w=MyEnum.A) producer.send(r) msg = consumer.receive() msg.value() self.assertEqual(MyEnum.C, msg.value().v) + self.assertEqual(MyEnum.A, MyEnum(msg.value().w)) + self.assertEqual(MyEnum.B, MyEnum(msg.value().x)) client.close() def test_avro_map_array(self): @@ -913,6 +932,11 @@ class SchemaTest(TestCase): client.close() def test_serialize_schema_complex(self): + class Color(Enum): + red = 1 + green = 2 + blue = 3 + class NestedObj1(Record): _sorted_fields = True na1 = String() @@ -925,6 +949,8 @@ class SchemaTest(TestCase): nc2 = NestedObj1() class NestedObj3(Record): + _sorted_fields = True + color = CustomEnum(Color) na3 = Integer() class NestedObj4(Record): @@ -933,11 +959,6 @@ class SchemaTest(TestCase): na4 = String() nb4 = Integer() - class Color(Enum): - red = 1 - green = 2 - blue = 3 - class ComplexRecord(Record): _avro_namespace = 'xxx.xxx' _sorted_fields = True @@ -945,6 +966,7 @@ class SchemaTest(TestCase): b = Integer() color = Color color2 = Color + color3 = CustomEnum(Color, required=True, default=Color.red, required_default=True) nested = NestedObj2() nested2 = NestedObj2() mapNested = Map(NestedObj3()) @@ -970,8 +992,10 @@ class SchemaTest(TestCase): {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': [ 'red', 'green', 'blue']}]}, {'name': 'color2', 'type': ['null', 'Color']}, + {'name': 'color3', 'default': 'red', 'type': 'Color'}, {'name': 'mapNested', 'type': ['null', {'type': 'map', 'values': {'name': 'NestedObj3', 'type': 'record', 'fields': [ + {'name': 'color', 'type': ['null', 'Color']}, {'name': 'na3', 'type': ['null', 'int']} ]}} ]}, @@ -998,12 +1022,12 @@ class SchemaTest(TestCase): r = ComplexRecord(a=1, b=2, color=Color.red, color2=Color.blue, nested=nested_obj2, nested2=nested_obj2, mapNested={ - 'a': NestedObj3(na3=1), + 'a': NestedObj3(na3=1, color=Color.green), 'b': NestedObj3(na3=2), - 'c': NestedObj3(na3=3) + 'c': NestedObj3(na3=3, color=Color.red) }, mapNested2={ - 'd': NestedObj3(na3=4), - 'e': NestedObj3(na3=5), + 'd': NestedObj3(na3=4, color=Color.red), + 'e': NestedObj3(na3=5, color=Color.blue), 'f': NestedObj3(na3=6) }, arrayNested=[ NestedObj4(na4='value na4 1', nb4=100), @@ -1017,32 +1041,9 @@ class SchemaTest(TestCase): data_decode = data_schema.decode(data_encode) self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord') self.assertEqual(data_decode, r) - self.assertEqual(data_decode.a, 1) - self.assertEqual(data_decode.b, 2) - self.assertEqual(data_decode.color, Color.red) - self.assertEqual(data_decode.color2, Color.blue) - self.assertEqual(data_decode.nested.na2, 22) - self.assertEqual(data_decode.nested.nb2, True) - self.assertEqual(data_decode.nested.nc2.na1, 'na1 value') - self.assertEqual(data_decode.nested.nc2.nb1, 20.5) - self.assertEqual(data_decode.nested2.na2, 22) - self.assertEqual(data_decode.nested2.nb2, True) - self.assertEqual(data_decode.nested2.nc2.na1, 'na1 value') - self.assertEqual(data_decode.nested2.nc2.nb1, 20.5) - self.assertEqual(data_decode.mapNested['a'].na3, 1) - self.assertEqual(data_decode.mapNested['b'].na3, 2) - self.assertEqual(data_decode.mapNested['c'].na3, 3) - self.assertEqual(data_decode.mapNested2['d'].na3, 4) - self.assertEqual(data_decode.mapNested2['e'].na3, 5) - self.assertEqual(data_decode.mapNested2['f'].na3, 6) - self.assertEqual(data_decode.arrayNested[0].na4, 'value na4 1') - self.assertEqual(data_decode.arrayNested[0].nb4, 100) - self.assertEqual(data_decode.arrayNested[1].na4, 'value na4 2') - self.assertEqual(data_decode.arrayNested[1].nb4, 200) - self.assertEqual(data_decode.arrayNested2[0].na4, 'value na4 3') - self.assertEqual(data_decode.arrayNested2[0].nb4, 300) - self.assertEqual(data_decode.arrayNested2[1].na4, 'value na4 4') - self.assertEqual(data_decode.arrayNested2[1].nb4, 400) + self.assertEqual(r.color3, Color.red) + self.assertEqual(r.mapNested['a'].color, Color.green) + self.assertEqual(r.mapNested['b'].color, None) print('Encode and decode complex schema finish. schema_type: ', schema_type) encode_and_decode('avro') @@ -1069,8 +1070,12 @@ class SchemaTest(TestCase): self.assertEqual(data_decode.na2, 1) self.assertTrue(data_decode.nb2) - def test_produce_and_consume_complex_schema_data(self): + class Color(Enum): + red = 1 + green = 2 + blue = 3 + class NestedObj1(Record): na1 = String() nb1 = Double() @@ -1082,6 +1087,7 @@ class SchemaTest(TestCase): class NestedObj3(Record): na3 = Integer() + color = CustomEnum(Color, required=True, required_default=True, default=Color.blue) class NestedObj4(Record): na4 = String() @@ -1090,6 +1096,7 @@ class SchemaTest(TestCase): class ComplexRecord(Record): a = Integer() b = Integer() + color = CustomEnum(Color) nested = NestedObj2() mapNested = Map(NestedObj3()) arrayNested = Array(NestedObj4()) @@ -1112,8 +1119,8 @@ class SchemaTest(TestCase): nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5) nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1) r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={ - 'a': NestedObj3(na3=1), - 'b': NestedObj3(na3=2), + 'a': NestedObj3(na3=1, color=Color.red), + 'b': NestedObj3(na3=2, color=Color.green), 'c': NestedObj3(na3=3) }, arrayNested=[ NestedObj4(na4='value na4 1', nb4=100), @@ -1125,19 +1132,6 @@ class SchemaTest(TestCase): value = msg.value() self.assertEqual(value.__class__.__name__, 'ComplexRecord') self.assertEqual(value, r) - self.assertEqual(value.a, 1) - self.assertEqual(value.b, 2) - self.assertEqual(value.nested.na2, 22) - self.assertEqual(value.nested.nb2, True) - self.assertEqual(value.nested.nc2.na1, 'na1 value') - self.assertEqual(value.nested.nc2.nb1, 20.5) - self.assertEqual(value.mapNested['a'].na3, 1) - self.assertEqual(value.mapNested['b'].na3, 2) - self.assertEqual(value.mapNested['c'].na3, 3) - self.assertEqual(value.arrayNested[0].na4, 'value na4 1') - self.assertEqual(value.arrayNested[0].nb4, 100) - self.assertEqual(value.arrayNested[1].na4, 'value na4 2') - self.assertEqual(value.arrayNested[1].nb4, 200) print('Produce and consume complex schema data finish. schema_type', schema_type) @@ -1163,7 +1157,8 @@ class SchemaTest(TestCase): "industry": "software", "scale": ">100", "funds": "1000000.0" - } + }, + "companyType": "companyType1" } data = avro_schema.encode(company) company_decode = avro_schema.decode(data) @@ -1185,7 +1180,9 @@ class SchemaTest(TestCase): {'name': 'age', 'type': ['null', 'int']} ] }}]}, - {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]} + {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]}, + {'name': 'companyType', 'type': ['null', {'type': 'enum', 'name': 'CompanyType', 'symbols': + ['companyType1', 'companyType2', 'companyType3']}]} ] } encode_and_decode(schema_definition) @@ -1218,7 +1215,8 @@ class SchemaTest(TestCase): "industry": "software" + str(i), "scale": ">100", "funds": "1000000.0" - } + }, + "companyType": "companyType" + str((i % 3) + 1) } producer.send(company)