This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9a72fe4fa137e369aa27150e46c41b67041de022 Author: congbo <39078850+congbobo...@users.noreply.github.com> AuthorDate: Mon Apr 19 23:11:36 2021 +0800 [Python schema] Support python avro schema set default value. (#10265) ## Motivation now python avro schema don't support set default value, it led to the python schema can't update. ## implement 1. add `required` field to control the type of schema can set `null`. 2. add `required_default` filed to control to control the schema has default attribute wether or not. 3. add `default` field to control the default value of schema. (cherry picked from commit d6d0e3a88e5569b16b4c0b9cdbd845d5c07268e0) --- .../python/pulsar/schema/definition.py | 89 ++++++++--- pulsar-client-cpp/python/pulsar/schema/schema.py | 4 + pulsar-client-cpp/python/schema_test.py | 169 ++++++++++++++++++++- 3 files changed, 234 insertions(+), 28 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py index 7853d07..d46cf3c 100644 --- a/pulsar-client-cpp/python/pulsar/schema/definition.py +++ b/pulsar-client-cpp/python/pulsar/schema/definition.py @@ -56,10 +56,10 @@ class RecordMeta(type): class Record(with_metaclass(RecordMeta, object)): - def __init__(self, *args, **kwargs): - if args: - # Only allow keyword args - raise TypeError('Non-keyword arguments not allowed when initializing Records') + def __init__(self, default=None, required_default=False, required=False, *args, **kwargs): + self._required_default = required_default + self._default = default + self._required = required for k, value in self._fields.items(): if k in kwargs: @@ -85,18 +85,30 @@ class Record(with_metaclass(RecordMeta, object)): field_type = field.schema() if field._required else ['null', field.schema()] schema['fields'].append({ 'name': name, - 'type': field_type + 'type': field_type, + 'default': field.default() + }) if field.required_default() else schema['fields'].append({ + 'name': name, + 'type': field_type, }) + return schema def __setattr__(self, key, value): - if key not in self._fields: - raise AttributeError('Cannot set undeclared field ' + key + ' on record') + if key == '_default': + super(Record, self).__setattr__(key, value) + elif key == '_required_default': + super(Record, self).__setattr__(key, value) + elif key == '_required': + super(Record, self).__setattr__(key, value) + else: + if key not in self._fields: + raise AttributeError('Cannot set undeclared field ' + key + ' on record') - # Check that type of value matches the field type - field = self._fields[key] - value = field.validate_type(key, value) - super(Record, self).__setattr__(key, value) + # Check that type of value matches the field type + field = self._fields[key] + value = field.validate_type(key, value) + super(Record, self).__setattr__(key, value) def __eq__(self, other): for field in self._fields: @@ -116,12 +128,22 @@ class Record(with_metaclass(RecordMeta, object)): type(val), name, self.__class__)) return val + def default(self): + if self._default is not None: + return self._default + else: + return None + + def required_default(self): + return self._required_default + class Field(object): - def __init__(self, default=None, required=False): + def __init__(self, default=None, required=False, required_default=False): if default is not None: default = self.validate_type('default', default) self._default = default + self._required_default = required_default self._required = required @abstractmethod @@ -144,6 +166,10 @@ class Field(object): def default(self): return self._default + def required_default(self): + return self._required_default + + # All types @@ -185,7 +211,7 @@ class Integer(Field): if self._default is not None: return self._default else: - return 0 + return None class Long(Field): @@ -199,7 +225,7 @@ class Long(Field): if self._default is not None: return self._default else: - return 0 + return None class Float(Field): @@ -213,7 +239,7 @@ class Float(Field): if self._default is not None: return self._default else: - return 0.0 + return None class Double(Field): @@ -227,7 +253,7 @@ class Double(Field): if self._default is not None: return self._default else: - return 0.0 + return None class Bytes(Field): @@ -241,7 +267,7 @@ class Bytes(Field): if self._default is not None: return self._default else: - return bytes('') + return None class String(Field): @@ -261,7 +287,8 @@ class String(Field): if self._default is not None: return self._default else: - return str('') + return None + # Complex types @@ -309,12 +336,18 @@ class _Enum(Field): 'symbols': [x.name for x in self.enum_type] } + def default(self): + if self._default is not None: + return self._default + else: + return None + class Array(Field): - def __init__(self, array_type): + def __init__(self, array_type, default=None, required=False, required_default=False): _check_record_or_field(array_type) self.array_type = array_type - super(Array, self).__init__() + super(Array, self).__init__(default=default, required=required, required_default=required_default) def type(self): return 'array' @@ -338,12 +371,18 @@ class Array(Field): else self.array_type.type() } + def default(self): + if self._default is not None: + return self._default + else: + return None + class Map(Field): - def __init__(self, value_type): + def __init__(self, value_type, default=None, required=False, required_default=False): _check_record_or_field(value_type) self.value_type = value_type - super(Map, self).__init__() + super(Map, self).__init__(default=default, required=required, required_default=required_default) def type(self): return 'map' @@ -370,6 +409,12 @@ class Map(Field): else self.value_type.type() } + def default(self): + if self._default is not None: + return self._default + else: + return None + # Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2 # and also make it work well with Python3. diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py index 5f69ea2..d0da91a 100644 --- a/pulsar-client-cpp/python/pulsar/schema/schema.py +++ b/pulsar-client-cpp/python/pulsar/schema/schema.py @@ -87,6 +87,10 @@ class JsonSchema(Schema): def encode(self, obj): self._validate_object_type(obj) + del obj.__dict__['_default'] + del obj.__dict__['_required'] + del obj.__dict__['_required_default'] + return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8') def decode(self, data): diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py index a86824c..a0d60c0 100755 --- a/pulsar-client-cpp/python/schema_test.py +++ b/pulsar-client-cpp/python/schema_test.py @@ -180,9 +180,6 @@ class SchemaTest(TestCase): # Expected pass - try: - Record('xyz', a=1, b=2) - self.fail('Should have failed') except TypeError: # Expected pass @@ -410,7 +407,7 @@ class SchemaTest(TestCase): r = Example() self.assertEqual(r.a, 5) - self.assertEqual(r.b, 0) + self.assertEqual(r.b, None) self.assertEqual(r.c, 'hello') #### @@ -667,11 +664,171 @@ class SchemaTest(TestCase): client.close() + def test_avro_required_default(self): + class MySubRecord(Record): + x = Integer() + y = Long() + z = String() + + class Example(Record): + a = Integer() + b = Boolean(required=True) + c = Long() + d = Float() + e = Double() + f = String() + g = Bytes() + h = Array(String()) + i = Map(String()) + j = MySubRecord() + + class ExampleRequiredDefault(Record): + a = Integer(required_default=True) + b = Boolean(required=True, required_default=True) + c = Long(required_default=True) + d = Float(required_default=True) + e = Double(required_default=True) + f = String(required_default=True) + g = Bytes(required_default=True) + h = Array(String(), required_default=True) + i = Map(String(), required_default=True) + j = MySubRecord(required_default=True) + self.assertEqual(ExampleRequiredDefault.schema(), { + "name": "ExampleRequiredDefault", + "type": "record", + "fields": [ + { + "name": "a", + "type": [ + "null", + "int" + ], + "default": None + }, + { + "name": "b", + "type": "boolean", + "default": False + }, + { + "name": "c", + "type": [ + "null", + "long" + ], + "default": None + }, + { + "name": "d", + "type": [ + "null", + "float" + ], + "default": None + }, + { + "name": "e", + "type": [ + "null", + "double" + ], + "default": None + }, + { + "name": "f", + "type": [ + "null", + "string" + ], + "default": None + }, + { + "name": "g", + "type": [ + "null", + "bytes" + ], + "default": None + }, + { + "name": "h", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": None + }, + { + "name": "i", + "type": [ + "null", + { + "type": "map", + "values": "string" + } + ], + "default": None + }, + { + "name": "j", + "type": [ + "null", + { + "name": "MySubRecord", + "type": "record", + "fields": [ + { + "name": "x", + "type": [ + "null", + "int" + ] + }, + { + "name": "y", + "type": [ + "null", + "long" + ], + }, + { + "name": "z", + "type": [ + "null", + "string" + ] + } + ] + } + ], + "default": None + } + ] + }) + + client = pulsar.Client(self.serviceUrl) + producer = client.create_producer( + 'my-avro-python-default-topic', + schema=AvroSchema(Example)) + + producer_default = client.create_producer( + 'my-avro-python-default-topic', + schema=AvroSchema(ExampleRequiredDefault)) + + producer.close() + producer_default.close() + + client.close() + + def test_default_value(self): class MyRecord(Record): A = Integer() B = String() - C = Boolean() + C = Boolean(default=True, required=True) D = Double(default=6.4) topic = "my-default-value-topic" @@ -689,7 +846,7 @@ class SchemaTest(TestCase): msg = consumer.receive() self.assertEqual(msg.value().A, 5) self.assertEqual(msg.value().B, u'text') - self.assertEqual(msg.value().C, False) + self.assertEqual(msg.value().C, True) self.assertEqual(msg.value().D, 6.4) producer.close()