This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86a9f43  Support defining and validating schema on Python client 
(#3391)
86a9f43 is described below

commit 86a9f434f50b2008de4b68ad736e389c905a947e
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jan 30 06:41:01 2019 -0800

    Support defining and validating schema on Python client (#3391)
    
    * Support defining and validating schema on Python client
    
    * Added missing license headers
    
    * Fixed metaclass declaration to work for py2 and py3
    
    * More python2 compat fixes
    
    * Added missing pip dependency on fastavro
---
 pulsar-client-cpp/python/CMakeLists.txt            |   2 +-
 pulsar-client-cpp/python/pulsar/__init__.py        |  77 +++-
 pulsar-client-cpp/python/pulsar/schema/__init__.py |  24 ++
 .../python/pulsar/schema/definition.py             | 206 +++++++++++
 pulsar-client-cpp/python/pulsar/schema/schema.py   | 104 ++++++
 pulsar-client-cpp/python/pulsar_test.py            |   2 +
 pulsar-client-cpp/python/schema_test.py            | 395 +++++++++++++++++++++
 pulsar-client-cpp/python/setup.py                  |  30 +-
 pulsar-client-cpp/python/src/config.cc             |   6 +
 pulsar-client-cpp/python/src/enums.cc              |  18 +
 pulsar-client-cpp/python/src/pulsar.cc             |   2 +
 .../python/src/{pulsar.cc => schema.cc}            |  40 +--
 12 files changed, 855 insertions(+), 51 deletions(-)

diff --git a/pulsar-client-cpp/python/CMakeLists.txt 
b/pulsar-client-cpp/python/CMakeLists.txt
index 2c51f6d..e57f583 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -22,7 +22,7 @@ INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}" 
"${PYTHON_INCLUDE_DIRS}")
 ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc
                             src/config.cc src/enums.cc src/client.cc
                             src/message.cc src/authentication.cc
-                            src/reader.cc)
+                            src/reader.cc src/schema.cc)
 SET(CMAKE_SHARED_LIBRARY_PREFIX )
 SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)
 
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index e03f9c6..427bd7b 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -103,6 +103,8 @@ from _pulsar import Result, CompressionType, ConsumerType, 
PartitionsRoutingMode
 from pulsar.functions.function import Function
 from pulsar.functions.context import Context
 from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
+from pulsar import schema
+_schema = schema
 
 import re
 _retype = type(re.compile('x'))
@@ -143,10 +145,16 @@ class Message:
 
     def data(self):
         """
-        Returns object typed bytes with the content of the message.
+        Returns object typed bytes with the payload of the message.
         """
         return self._message.data()
 
+    def value(self):
+        """
+        Returns object with the de-serialized version of the message content
+        """
+        return self._schema.decode(self._message.data())
+
     def properties(self):
         """
         Return the properties attached to the message. Properties are
@@ -206,6 +214,7 @@ class Authentication:
         _check_type(str, authParamsString, 'authParamsString')
         self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
 
+
 class AuthenticationTLS(Authentication):
     """
     TLS Authentication implementation
@@ -241,6 +250,7 @@ class AuthenticationToken(Authentication):
             raise ValueError("Argument token is expected to be of type 'str' 
or a function returning 'str'")
         self.auth = _pulsar.AuthenticationToken(token)
 
+
 class AuthenticationAthenz(Authentication):
     """
     Athenz Authentication implementation
@@ -345,6 +355,7 @@ class Client:
 
     def create_producer(self, topic,
                         producer_name=None,
+                        schema=schema.BytesSchema(),
                         initial_sequence_id=None,
                         send_timeout_millis=30000,
                         compression_type=CompressionType.NONE,
@@ -374,6 +385,12 @@ class Client:
            with `Producer.producer_name()`. When specifying a name, it is app 
to
            the user to ensure that, for a given topic, the producer name is 
unique
            across all Pulsar's clusters.
+        * `schema`:
+           Define the schema of the data that will be published by this 
producer.
+           The schema will be used for two purposes:
+             - Validate the data format against the topic defined schema
+             - Perform serialization/deserialization between data and objects
+           An example for this parameter would be to pass 
`schema=JsonSchema(MyRecordClass)`.
         * `initial_sequence_id`:
            Set the baseline for the sequence ids for messages
            published by the producer. First message will be using
@@ -405,6 +422,7 @@ class Client:
         """
         _check_type(str, topic, 'topic')
         _check_type_or_none(str, producer_name, 'producer_name')
+        _check_type(_schema.Schema, schema, 'schema')
         _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
         _check_type(int, send_timeout_millis, 'send_timeout_millis')
         _check_type(CompressionType, compression_type, 'compression_type')
@@ -436,12 +454,16 @@ class Client:
             for k, v in properties.items():
                 conf.property(k, v)
 
+        conf.schema(schema.schema_info())
+
         p = Producer()
         p._producer = self._client.create_producer(topic, conf)
+        p._schema = schema
         return p
 
     def subscribe(self, topic, subscription_name,
                   consumer_type=ConsumerType.Exclusive,
+                  schema=schema.BytesSchema(),
                   message_listener=None,
                   receiver_queue_size=1000,
                   max_total_receiver_queue_size_across_partitions=50000,
@@ -468,6 +490,8 @@ class Client:
 
         * `consumer_type`:
           Select the subscription type to be used when subscribing to the 
topic.
+        * `schema`:
+           Define the schema of the data that will be received by this 
consumer.
         * `message_listener`:
           Sets a message listener for the consumer. When the listener is set,
           the application will receive messages through it. Calls to
@@ -515,6 +539,7 @@ class Client:
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
+        _check_type(_schema.Schema, schema, 'schema')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
         _check_type(int, max_total_receiver_queue_size_across_partitions,
                     'max_total_receiver_queue_size_across_partitions')
@@ -528,7 +553,7 @@ class Client:
         conf.consumer_type(consumer_type)
         conf.read_compacted(is_read_compacted)
         if message_listener:
-            conf.message_listener(message_listener)
+            conf.message_listener(_listener_wrapper(message_listener, schema))
         conf.receiver_queue_size(receiver_queue_size)
         
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
         if consumer_name:
@@ -540,6 +565,8 @@ class Client:
             for k, v in properties.items():
                 conf.property(k, v)
 
+        conf.schema(schema.schema_info())
+
         c = Consumer()
         if isinstance(topic, str):
             # Single topic
@@ -554,10 +581,12 @@ class Client:
             raise ValueError("Argument 'topic' is expected to be of a type 
between (str, list, re.pattern)")
 
         c._client = self
+        c._schema = schema
         self._consumers.append(c)
         return c
 
     def create_reader(self, topic, start_message_id,
+                      schema=schema.BytesSchema(),
                       reader_listener=None,
                       receiver_queue_size=1000,
                       reader_name=None,
@@ -587,6 +616,8 @@ class Client:
 
         **Options**
 
+        * `schema`:
+           Define the schema of the data that will be received by this reader.
         * `reader_listener`:
           Sets a message listener for the reader. When the listener is set,
           the application will receive messages through it. Calls to
@@ -610,21 +641,25 @@ class Client:
         """
         _check_type(str, topic, 'topic')
         _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
+        _check_type(_schema.Schema, schema, 'schema')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
         _check_type_or_none(str, reader_name, 'reader_name')
         _check_type_or_none(str, subscription_role_prefix, 
'subscription_role_prefix')
 
         conf = _pulsar.ReaderConfiguration()
         if reader_listener:
-            conf.reader_listener(reader_listener)
+            conf.reader_listener(_listener_wrapper(reader_listener, schema))
         conf.receiver_queue_size(receiver_queue_size)
         if reader_name:
             conf.reader_name(reader_name)
         if subscription_role_prefix:
             conf.subscription_role_prefix(subscription_role_prefix)
+        conf.schema(schema.schema_info())
+
         c = Reader()
         c._reader = self._client.create_reader(topic, start_message_id, conf)
         c._client = self
+        c._schema = schema
         self._consumers.append(c)
         return c
 
@@ -781,7 +816,9 @@ class Producer:
 
     def _build_msg(self, content, properties, partition_key, sequence_id,
                    replication_clusters, disable_replication, event_timestamp):
-        _check_type(bytes, content, 'content')
+        data = self._schema.encode(content)
+
+        _check_type(bytes, data, 'data')
         _check_type_or_none(dict, properties, 'properties')
         _check_type_or_none(str, partition_key, 'partition_key')
         _check_type_or_none(int, sequence_id, 'sequence_id')
@@ -790,7 +827,7 @@ class Producer:
         _check_type_or_none(int, event_timestamp, 'event_timestamp')
 
         mb = _pulsar.MessageBuilder()
-        mb.content(content)
+        mb.content(data)
         if properties:
             for k, v in properties.items():
                 mb.property(k, v)
@@ -850,10 +887,15 @@ class Consumer:
           available within the timeout.
         """
         if timeout_millis is None:
-            return self._consumer.receive()
+            msg = self._consumer.receive()
         else:
             _check_type(int, timeout_millis, 'timeout_millis')
-            return self._consumer.receive(timeout_millis)
+            msg = self._consumer.receive(timeout_millis)
+
+        m = Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
 
     def acknowledge(self, message):
         """
@@ -957,10 +999,15 @@ class Reader:
           available within the timeout.
         """
         if timeout_millis is None:
-            return self._reader.read_next()
+            msg = self._reader.read_next()
         else:
             _check_type(int, timeout_millis, 'timeout_millis')
-            return self._reader.read_next(timeout_millis)
+            msg = self._reader.read_next(timeout_millis)
+
+        m = Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
 
     def has_message_available(self):
         """
@@ -978,10 +1025,20 @@ class Reader:
 
 def _check_type(var_type, var, name):
     if not isinstance(var, var_type):
-        raise ValueError("Argument %s is expected to be of type '%s'" % (name, 
var_type.__name__))
+        raise ValueError("Argument %s is expected to be of type '%s' and not 
'%s'"
+                         % (name, var_type.__name__, type(var).__name__))
 
 
 def _check_type_or_none(var_type, var, name):
     if var is not None and not isinstance(var, var_type):
         raise ValueError("Argument %s is expected to be either None or of type 
'%s'"
                          % (name, var_type.__name__))
+
+
+def _listener_wrapper(listener, schema):
+    def wrapper(c, msg):
+        m = Message()
+        m._message = msg
+        m._schema = schema
+        listener(c, m)
+    return wrapper
diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py 
b/pulsar-client-cpp/python/pulsar/schema/__init__.py
new file mode 100644
index 0000000..096e64a
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from .definition import *
+from .schema import *
+
+
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py 
b/pulsar-client-cpp/python/pulsar/schema/definition.py
new file mode 100644
index 0000000..4658c03
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -0,0 +1,206 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from abc import abstractmethod, ABCMeta
+from enum import Enum, EnumMeta
+from collections import OrderedDict
+from six import with_metaclass
+
+
+def _check_record_or_field(x):
+    if (type(x) is type and not issubclass(x, Record)) \
+            and not isinstance(x, Field):
+        raise Exception('Argument ' + x + ' is not a Record or a Field')
+
+
+class RecordMeta(type):
+    def __new__(metacls, name, parents, dct):
+        if name != 'Record':
+            # Do not apply this logic to the base class itself
+            dct['_fields'] = RecordMeta._get_fields(dct)
+        return type.__new__(metacls, name, parents, dct)
+
+    @classmethod
+    def _get_fields(cls, dct):
+        # Build a set of valid fields for this record
+        fields = OrderedDict()
+        for name, value in dct.items():
+            if issubclass(type(value), EnumMeta):
+                # Wrap Python enums
+                value = _Enum(value)
+            elif type(value) == RecordMeta:
+                # We expect an instance of a record rather than the class 
itself
+                value = value()
+
+            if isinstance(value, Record) or isinstance(value, Field):
+                fields[name] = value
+        return fields
+
+
+class Record(with_metaclass(RecordMeta, object)):
+
+    def __init__(self, *args, **kwargs):
+        if args:
+            # Only allow keyword args
+            raise TypeError('Non-keyword arguments not allowed when 
initializing Records')
+
+        for k, value in self._fields.items():
+            if k in kwargs:
+                # Value was overridden at constructor
+                self.__setattr__(k, kwargs[k])
+            else:
+                # Set field to default value
+                self.__setattr__(k, value.default())
+
+    @classmethod
+    def schema(cls):
+        schema = {
+            'name': str(cls.__name__),
+            'type': 'record',
+            'fields': []
+        }
+
+        for name, value in cls._fields.items():
+            schema['fields'].append({
+                'name': name,
+                'type': value.schema()
+            })
+        return schema
+
+    def __setattr__(self, key, value):
+        if key not in self._fields:
+            raise AttributeError('Cannot set undeclared field ' + key + ' on 
record')
+        super(Record, self).__setattr__(key, value)
+
+    def __eq__(self, other):
+        for field in self._fields:
+            if self.__getattribute__(field) != other.__getattribute__(field):
+                return False
+        return True
+
+    def __str__(self):
+        return str(self.__dict__)
+
+
+class Field(object):
+    def __init__(self, default=None):
+        self._default = default
+
+    @abstractmethod
+    def type(self):
+        pass
+
+    def schema(self):
+        # For primitive types, the schema would just be the type itself
+        return self.type()
+
+    def default(self):
+        return self._default
+
+# All types
+
+
+class Null(Field):
+    def type(self):
+        return 'null'
+
+
+class Boolean(Field):
+    def type(self):
+        return 'boolean'
+
+
+class Integer(Field):
+    def type(self):
+        return 'int'
+
+
+class Long(Field):
+    def type(self):
+        return 'long'
+
+
+class Float(Field):
+    def type(self):
+        return 'float'
+
+
+class Double(Field):
+    def type(self):
+        return 'double'
+
+
+class Bytes(Field):
+    def type(self):
+        return 'bytes'
+
+
+class String(Field):
+    def type(self):
+        return 'string'
+
+
+# Complex types
+
+class _Enum(Field):
+    def __init__(self, enum_type):
+        if not issubclass(enum_type, Enum):
+            raise Exception(enum_type + " is not a valid Enum type")
+        self.enum_type = enum_type
+
+    def type(self):
+        return 'enum'
+
+    def schema(self):
+        return {
+            'type': self.type(),
+            'name': self.enum_type.__name__,
+            'symbols': [x.name for x in self.enum_type]
+        }
+
+
+class Array(Field):
+    def __init__(self, array_type):
+        _check_record_or_field(array_type)
+        self.array_type = array_type
+
+    def type(self):
+        return 'array'
+
+    def schema(self):
+        return {
+            'type': self.type(),
+            'items': self.array_type.schema()
+        }
+
+
+class Map(Field):
+    def __init__(self, value_type):
+        _check_record_or_field(value_type)
+        self.value_type = value_type
+
+    def type(self):
+        return 'map'
+
+    def schema(self):
+        return {
+            'type': self.type(),
+            'values': self.value_type.schema()
+        }
+
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py 
b/pulsar-client-cpp/python/pulsar/schema/schema.py
new file mode 100644
index 0000000..91250a8
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from abc import abstractmethod
+import json
+import fastavro
+import _pulsar
+import io
+
+
+class Schema(object):
+    def __init__(self, record_cls, schema_type, schema_definition, 
schema_name):
+        self._record_cls = record_cls
+        self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
+                                               json.dumps(schema_definition, 
indent=True))
+
+    @abstractmethod
+    def encode(self, obj):
+        pass
+
+    @abstractmethod
+    def decode(self, data):
+        pass
+
+    def schema_info(self):
+        return self._schema_info
+
+    def _validate_object_type(self, obj):
+        if not isinstance(obj, self._record_cls):
+            raise TypeError('Invalid record obj of type ' + str(type(obj))
+                            + ' - expected type is ' + str(self._record_cls))
+
+
+class BytesSchema(Schema):
+    def __init__(self):
+        super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES, 
None, 'BYTES')
+
+    def encode(self, data):
+        self._validate_object_type(data)
+        return data
+
+    def decode(self, data):
+        return data
+
+
+class StringSchema(Schema):
+    def __init__(self):
+        super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING, 
None, 'STRING')
+
+    def encode(self, obj):
+        self._validate_object_type(obj)
+        return obj.encode('utf-8')
+
+    def decode(self, data):
+        return data.decode('utf-8')
+
+
+class JsonSchema(Schema):
+
+    def __init__(self, record_cls):
+        super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
+                                         record_cls.schema(), 'JSON')
+
+    def encode(self, obj):
+        self._validate_object_type(obj)
+        return json.dumps(obj.__dict__, indent=True).encode('utf-8')
+
+    def decode(self, data):
+        return self._record_cls(**json.loads(data))
+
+
+class AvroSchema(Schema):
+    def __init__(self, record_cls):
+        super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO,
+                                         record_cls.schema(), 'AVRO')
+        self._schema = record_cls.schema()
+
+    def encode(self, obj):
+        self._validate_object_type(obj)
+        buffer = io.BytesIO()
+        fastavro.schemaless_writer(buffer, self._schema, obj.__dict__)
+        return buffer.getvalue()
+
+    def decode(self, data):
+        buffer = io.BytesIO(data)
+        d = fastavro.schemaless_reader(buffer, self._schema)
+        return self._record_cls(**d)
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index 3cb4829..9f2aa89 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -28,6 +28,8 @@ from pulsar import Client, MessageId, \
 
 from _pulsar import ProducerConfiguration, ConsumerConfiguration
 
+from schema_test import *
+
 try:
     # For Python 3.0 and later
     from urllib.request import urlopen, Request
diff --git a/pulsar-client-cpp/python/schema_test.py 
b/pulsar-client-cpp/python/schema_test.py
new file mode 100755
index 0000000..5bbd077
--- /dev/null
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -0,0 +1,395 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import pulsar
+from pulsar.schema import *
+from enum import Enum
+import json
+
+
+class SchemaTest(TestCase):
+
+    serviceUrl = 'pulsar://localhost:6650'
+
+    def test_simple(self):
+        class Color(Enum):
+            red = 1
+            green = 2
+            blue = 3
+
+        class Example(Record):
+            a = String()
+            b = Integer()
+            c = Array(String())
+            d = Color
+            e = Boolean()
+            f = Float()
+            g = Double()
+            h = Bytes()
+            i = Map(String())
+
+        self.assertEqual(Example.schema(), {
+            "name": "Example",
+            "type": "record",
+            "fields": [
+                {"name": "a", "type": "string"},
+                {"name": "b", "type": "int"},
+                {"name": "c", "type": {
+                    "type": "array",
+                    "items": "string"}
+                },
+                {"name": "d",
+                 "type": {
+                    "type": "enum",
+                    "name": "Color",
+                    "symbols": ["red", "green", "blue"]}
+                 },
+                {"name": "e", "type": "boolean"},
+                {"name": "f", "type": "float"},
+                {"name": "g", "type": "double"},
+                {"name": "h", "type": "bytes"},
+                {"name": "i", "type": {
+                    "type": "map",
+                    "values": "string"}
+                 },
+            ]
+        })
+
+    def test_complex(self):
+        class MySubRecord(Record):
+            x = Integer()
+            y = Long()
+            z = String()
+
+        class Example(Record):
+            a = String()
+            sub = MySubRecord     # Test with class
+            sub2 = MySubRecord()  # Test with instance
+
+        self.assertEqual(Example.schema(), {
+            "name": "Example",
+            "type": "record",
+            "fields": [
+                {"name": "a", "type": "string"},
+                {"name": "sub",
+                 "type": {
+                     "name": "MySubRecord",
+                     "type": "record",
+                     "fields": [{"name": "x", "type": "int"},
+                                {"name": "y", "type": "long"},
+                                {"name": "z", "type": "string"}]
+                 }
+                 },
+                 {"name": "sub2",
+                  "type": {
+                     "name": "MySubRecord",
+                     "type": "record",
+                     "fields": [{"name": "x", "type": "int"},
+                                {"name": "y", "type": "long"},
+                                {"name": "z", "type": "string"}]
+                 }
+                 }
+            ]
+        })
+
+    def test_invalid_enum(self):
+        class Color:
+            red = 1
+            green = 2
+            blue = 3
+
+        class InvalidEnum(Record):
+            a = Integer()
+            b = Color
+
+        # Enum will be ignored
+        self.assertEqual(InvalidEnum.schema(),
+                         {'name': 'InvalidEnum', 'type': 'record', 'fields': 
[{'name': 'a', 'type': 'int'}]})
+
+    def test_initialization(self):
+        class Example(Record):
+            a = Integer()
+            b = Integer()
+
+        r = Example(a=1, b=2)
+        self.assertEqual(r.a, 1)
+        self.assertEqual(r.b, 2)
+
+        r.b = 5
+
+        self.assertEqual(r.b, 5)
+
+        # Setting non-declared field should fail
+        try:
+            r.c = 3
+            self.fail('Should have failed')
+        except AttributeError:
+            # Expected
+            pass
+
+        try:
+            Record(a=1, c=8)
+            self.fail('Should have failed')
+        except AttributeError:
+            # Expected
+            pass
+
+        try:
+            Record('xyz', a=1, b=2)
+            self.fail('Should have failed')
+        except TypeError:
+            # Expected
+            pass
+
+    def test_serialize_json(self):
+        class Example(Record):
+            a = Integer()
+            b = Integer()
+
+        self.assertEqual(Example.schema(), {
+            "name": "Example",
+            "type": "record",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "int"},
+            ]
+        })
+
+        s = JsonSchema(Example)
+        r = Example(a=1, b=2)
+        data = s.encode(r)
+        self.assertEqual(json.loads(data), {'a': 1, 'b': 2})
+
+        r2 = s.decode(data)
+        self.assertEqual(r2.__class__.__name__, 'Example')
+        self.assertEqual(r2, r)
+
+    def test_serialize_avro(self):
+        class Example(Record):
+            a = Integer()
+            b = Integer()
+
+        self.assertEqual(Example.schema(), {
+            "name": "Example",
+            "type": "record",
+            "fields": [
+                {"name": "a", "type": "int"},
+                {"name": "b", "type": "int"},
+            ]
+        })
+
+        s = AvroSchema(Example)
+        r = Example(a=1, b=2)
+        data = s.encode(r)
+
+        r2 = s.decode(data)
+        self.assertEqual(r2.__class__.__name__, 'Example')
+        self.assertEqual(r2, r)
+
+    def test_serialize_wrong_types(self):
+        class Example(Record):
+            a = Integer()
+            b = Integer()
+
+        class Foo(Record):
+            x = Integer()
+            y = Integer()
+
+        s = JsonSchema(Example)
+        try:
+            data = s.encode(Foo(x=1, y=2))
+            self.fail('Should have failed')
+        except TypeError:
+            pass  # expected
+
+        try:
+            data = s.encode('hello')
+            self.fail('Should have failed')
+        except TypeError:
+            pass  # expected
+
+    def test_defaults(self):
+        class Example(Record):
+            a = Integer(default=5)
+            b = Integer()
+            c = String(default='hello')
+
+        r = Example()
+        self.assertEqual(r.a, 5)
+        self.assertEqual(r.b, None)
+        self.assertEqual(r.c, 'hello')
+
+    ####
+
+    def test_json_schema(self):
+
+        class Example(Record):
+            a = Integer()
+            b = Integer()
+
+        # Incompatible variation of the class
+        class BadExample(Record):
+            a = String()
+            b = Integer()
+
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+                        'my-json-python-topic',
+                        schema=JsonSchema(Example))
+
+
+        # Validate that incompatible schema is rejected
+        try:
+            client.subscribe('my-json-python-topic', 'sub-1',
+                             schema=JsonSchema(BadExample))
+            self.fail('Should have failed')
+        except Exception as e:
+            pass  # Expected
+
+        try:
+            client.subscribe('my-json-python-topic', 'sub-1',
+                             schema=StringSchema(BadExample))
+            self.fail('Should have failed')
+        except Exception as e:
+            pass  # Expected
+
+        try:
+            client.subscribe('my-json-python-topic', 'sub-1',
+                             schema=AvroSchema(BadExample))
+            self.fail('Should have failed')
+        except Exception as e:
+            pass  # Expected
+
+        consumer = client.subscribe('my-json-python-topic', 'sub-1',
+                                    schema=JsonSchema(Example))
+
+        r = Example(a=1, b=2)
+        producer.send(r)
+
+        msg = consumer.receive()
+
+        self.assertEqual(r, msg.value())
+        client.close()
+
+    def test_string_schema(self):
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+                        'my-string-python-topic',
+                        schema=StringSchema())
+
+
+        # Validate that incompatible schema is rejected
+        try:
+            class Example(Record):
+                a = Integer()
+                b = Integer()
+
+            client.create_producer('my-string-python-topic',
+                             schema=JsonSchema(Example))
+            self.fail('Should have failed')
+        except Exception as e:
+            pass  # Expected
+
+        consumer = client.subscribe('my-string-python-topic', 'sub-1',
+                                    schema=StringSchema())
+
+        producer.send("Hello")
+
+        msg = consumer.receive()
+
+        self.assertEqual("Hello", msg.value())
+        self.assertEqual(b"Hello", msg.data())
+        client.close()
+
+
+    def test_bytes_schema(self):
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+                        'my-bytes-python-topic',
+                        schema=BytesSchema())
+
+
+        # Validate that incompatible schema is rejected
+        try:
+            class Example(Record):
+                a = Integer()
+                b = Integer()
+
+            client.create_producer('my-bytes-python-topic',
+                             schema=JsonSchema(Example))
+            self.fail('Should have failed')
+        except Exception as e:
+            pass  # Expected
+
+        consumer = client.subscribe('my-bytes-python-topic', 'sub-1',
+                                    schema=BytesSchema())
+
+        producer.send(b"Hello")
+
+        msg = consumer.receive()
+
+        self.assertEqual(b"Hello", msg.value())
+        client.close()
+
+    def test_avro_schema(self):
+
+        class Example(Record):
+            a = Integer()
+            b = Integer()
+
+        # Incompatible variation of the class
+        class BadExample(Record):
+            a = String()
+            b = Integer()
+
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+                        'my-avro-python-topic',
+                        schema=AvroSchema(Example))
+
+        # Validate that incompatible schema is rejected
+        try:
+            client.subscribe('my-avro-python-topic', 'sub-1',
+                             schema=AvroSchema(BadExample))
+            self.fail('Should have failed')
+        except Exception as e:
+            pass  # Expected
+
+        try:
+            client.subscribe('my-avro-python-topic', 'sub-2',
+                             schema=JsonSchema(Example))
+            self.fail('Should have failed')
+        except Exception as e:
+            pass  # Expected
+
+        consumer = client.subscribe('my-avro-python-topic', 'sub-3',
+                                    schema=AvroSchema(Example))
+
+        r = Example(a=1, b=2)
+        producer.send(r)
+
+        msg = consumer.receive()
+
+        self.assertEqual(r, msg.value())
+        client.close()
+
+if __name__ == '__main__':
+    main()
diff --git a/pulsar-client-cpp/python/setup.py 
b/pulsar-client-cpp/python/setup.py
index ce8d259..db165ae 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -20,6 +20,7 @@
 from setuptools import setup
 from distutils.core import Extension
 import subprocess
+import sys
 
 from distutils.command import build_ext
 
@@ -41,6 +42,10 @@ def get_version():
 
 VERSION = get_version()
 
+if sys.version_info[0] == 2:
+    PY2 = True
+else:
+    PY2 = False
 
 # This is a workaround to have setuptools to include
 # the already compiled _pulsar.so library
@@ -57,10 +62,24 @@ class my_build_ext(build_ext.build_ext):
         shutil.copyfile('_pulsar.so', self.get_ext_fullpath(ext.name))
 
 
+dependencies = [
+    'grpcio', 'protobuf',
+    'six',
+    'fastavro',
+
+    # functions dependencies
+    "prometheus_client",
+    "ratelimit"
+]
+
+if PY2:
+    # Python 2 compat dependencies
+    dependencies += ['enum34']
+
 setup(
     name="pulsar-client",
     version=VERSION,
-    packages=['pulsar', 'pulsar.functions'],
+    packages=['pulsar', 'pulsar.schema', 'pulsar.functions'],
     cmdclass={'build_ext': my_build_ext},
     ext_modules=[Extension('_pulsar', [])],
 
@@ -68,11 +87,6 @@ setup(
     author_email="[email protected]",
     description="Apache Pulsar Python client library",
     license="Apache License v2.0",
-    url="http://pulsar.apache.org/";,
-    install_requires=[
-        'grpcio', 'protobuf',
-        # functions dependencies
-        "prometheus_client",
-        "ratelimit"
-    ],
+    url="https://pulsar.apache.org/";,
+    install_requires=dependencies,
 )
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index c1bfeef..9b1d6d0 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -100,6 +100,8 @@ void export_config() {
     class_<ProducerConfiguration>("ProducerConfiguration")
             .def("producer_name", &ProducerConfiguration::getProducerName, 
return_value_policy<copy_const_reference>())
             .def("producer_name", &ProducerConfiguration::setProducerName, 
return_self<>())
+            .def("schema", &ProducerConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
+            .def("schema", &ProducerConfiguration::setSchema, return_self<>())
             .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
             .def("send_timeout_millis", 
&ProducerConfiguration::setSendTimeout, return_self<>())
             .def("initial_sequence_id", 
&ProducerConfiguration::getInitialSequenceId)
@@ -128,6 +130,8 @@ void export_config() {
     class_<ConsumerConfiguration>("ConsumerConfiguration")
             .def("consumer_type", &ConsumerConfiguration::getConsumerType)
             .def("consumer_type", &ConsumerConfiguration::setConsumerType, 
return_self<>())
+            .def("schema", &ConsumerConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
+            .def("schema", &ConsumerConfiguration::setSchema, return_self<>())
             .def("message_listener", 
&ConsumerConfiguration_setMessageListener, return_self<>())
             .def("receiver_queue_size", 
&ConsumerConfiguration::getReceiverQueueSize)
             .def("receiver_queue_size", 
&ConsumerConfiguration::setReceiverQueueSize)
@@ -148,6 +152,8 @@ void export_config() {
 
     class_<ReaderConfiguration>("ReaderConfiguration")
             .def("message_listener", &ReaderConfiguration_setReaderListener, 
return_self<>())
+            .def("schema", &ReaderConfiguration::getSchema, 
return_value_policy<copy_const_reference>())
+            .def("schema", &ReaderConfiguration::setSchema, return_self<>())
             .def("receiver_queue_size", 
&ReaderConfiguration::getReceiverQueueSize)
             .def("receiver_queue_size", 
&ReaderConfiguration::setReceiverQueueSize)
             .def("reader_name", &ReaderConfiguration::getReaderName, 
return_value_policy<copy_const_reference>())
diff --git a/pulsar-client-cpp/python/src/enums.cc 
b/pulsar-client-cpp/python/src/enums.cc
index 704be25..3c0a6b9 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -76,4 +76,22 @@ void export_enums() {
             .value("UnsupportedVersionError", ResultUnsupportedVersionError)
             ;
 
+    enum_<SchemaType>("SchemaType", "Supported schema types")
+            .value("NONE", NONE)
+            .value("STRING", STRING)
+            .value("INT8", INT8)
+            .value("INT16", INT16)
+            .value("INT32", INT32)
+            .value("INT64", INT64)
+            .value("FLOAT", FLOAT)
+            .value("DOUBLE", DOUBLE)
+            .value("BYTES", BYTES)
+            .value("JSON", JSON)
+            .value("PROTOBUF", PROTOBUF)
+            .value("AVRO", AVRO)
+            .value("AUTO_CONSUME", AUTO_CONSUME)
+            .value("AUTO_PUBLISH", AUTO_PUBLISH)
+            .value("KEY_VALUE", KEY_VALUE)
+            ;
+
 }
diff --git a/pulsar-client-cpp/python/src/pulsar.cc 
b/pulsar-client-cpp/python/src/pulsar.cc
index f3ceefd..b26a252 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/pulsar.cc
@@ -26,6 +26,7 @@ void export_reader();
 void export_config();
 void export_enums();
 void export_authentication();
+void export_schema();
 
 
 static void translateException(const PulsarException& ex) {
@@ -51,4 +52,5 @@ BOOST_PYTHON_MODULE(_pulsar)
     export_config();
     export_enums();
     export_authentication();
+    export_schema();
 }
diff --git a/pulsar-client-cpp/python/src/pulsar.cc 
b/pulsar-client-cpp/python/src/schema.cc
similarity index 51%
copy from pulsar-client-cpp/python/src/pulsar.cc
copy to pulsar-client-cpp/python/src/schema.cc
index f3ceefd..397ec65 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/schema.cc
@@ -18,37 +18,13 @@
  */
 #include "utils.h"
 
-void export_client();
-void export_message();
-void export_producer();
-void export_consumer();
-void export_reader();
-void export_config();
-void export_enums();
-void export_authentication();
+void export_schema() {
+    using namespace boost::python;
 
-
-static void translateException(const PulsarException& ex) {
-    std::string err = "Pulsar error: ";
-    err += strResult(ex._result);
-
-    PyErr_SetString(PyExc_Exception, err.c_str());
-}
-
-BOOST_PYTHON_MODULE(_pulsar)
-{
-    py::register_exception_translator<PulsarException>(translateException);
-
-    // Initialize thread support so that we can grab the GIL mutex
-    // from pulsar library threads
-    PyEval_InitThreads();
-
-    export_client();
-    export_message();
-    export_producer();
-    export_consumer();
-    export_reader();
-    export_config();
-    export_enums();
-    export_authentication();
+    class_<SchemaInfo>("SchemaInfo",
+            init<SchemaType, const std::string& , const std::string&>())
+            .def("schema_type", &SchemaInfo::getSchemaType)
+            .def("name", &SchemaInfo::getName, 
return_value_policy<copy_const_reference>())
+            .def("schema", &SchemaInfo::getSchema, 
return_value_policy<copy_const_reference>())
+            ;
 }

Reply via email to