This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86a9f43 Support defining and validating schema on Python client
(#3391)
86a9f43 is described below
commit 86a9f434f50b2008de4b68ad736e389c905a947e
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jan 30 06:41:01 2019 -0800
Support defining and validating schema on Python client (#3391)
* Support defining and validating schema on Python client
* Added missing license headers
* Fixed metaclass declaration to work for py2 and py3
* More python2 compat fixes
* Added missing pip dependency on fastavro
---
pulsar-client-cpp/python/CMakeLists.txt | 2 +-
pulsar-client-cpp/python/pulsar/__init__.py | 77 +++-
pulsar-client-cpp/python/pulsar/schema/__init__.py | 24 ++
.../python/pulsar/schema/definition.py | 206 +++++++++++
pulsar-client-cpp/python/pulsar/schema/schema.py | 104 ++++++
pulsar-client-cpp/python/pulsar_test.py | 2 +
pulsar-client-cpp/python/schema_test.py | 395 +++++++++++++++++++++
pulsar-client-cpp/python/setup.py | 30 +-
pulsar-client-cpp/python/src/config.cc | 6 +
pulsar-client-cpp/python/src/enums.cc | 18 +
pulsar-client-cpp/python/src/pulsar.cc | 2 +
.../python/src/{pulsar.cc => schema.cc} | 40 +--
12 files changed, 855 insertions(+), 51 deletions(-)
diff --git a/pulsar-client-cpp/python/CMakeLists.txt
b/pulsar-client-cpp/python/CMakeLists.txt
index 2c51f6d..e57f583 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -22,7 +22,7 @@ INCLUDE_DIRECTORIES("${Boost_INCLUDE_DIRS}"
"${PYTHON_INCLUDE_DIRS}")
ADD_LIBRARY(_pulsar SHARED src/pulsar.cc src/producer.cc src/consumer.cc
src/config.cc src/enums.cc src/client.cc
src/message.cc src/authentication.cc
- src/reader.cc)
+ src/reader.cc src/schema.cc)
SET(CMAKE_SHARED_LIBRARY_PREFIX )
SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py
b/pulsar-client-cpp/python/pulsar/__init__.py
index e03f9c6..427bd7b 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -103,6 +103,8 @@ from _pulsar import Result, CompressionType, ConsumerType,
PartitionsRoutingMode
from pulsar.functions.function import Function
from pulsar.functions.context import Context
from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
+from pulsar import schema
+_schema = schema
import re
_retype = type(re.compile('x'))
@@ -143,10 +145,16 @@ class Message:
def data(self):
"""
- Returns object typed bytes with the content of the message.
+ Returns object typed bytes with the payload of the message.
"""
return self._message.data()
+ def value(self):
+ """
+ Returns object with the de-serialized version of the message content
+ """
+ return self._schema.decode(self._message.data())
+
def properties(self):
"""
Return the properties attached to the message. Properties are
@@ -206,6 +214,7 @@ class Authentication:
_check_type(str, authParamsString, 'authParamsString')
self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
+
class AuthenticationTLS(Authentication):
"""
TLS Authentication implementation
@@ -241,6 +250,7 @@ class AuthenticationToken(Authentication):
raise ValueError("Argument token is expected to be of type 'str'
or a function returning 'str'")
self.auth = _pulsar.AuthenticationToken(token)
+
class AuthenticationAthenz(Authentication):
"""
Athenz Authentication implementation
@@ -345,6 +355,7 @@ class Client:
def create_producer(self, topic,
producer_name=None,
+ schema=schema.BytesSchema(),
initial_sequence_id=None,
send_timeout_millis=30000,
compression_type=CompressionType.NONE,
@@ -374,6 +385,12 @@ class Client:
with `Producer.producer_name()`. When specifying a name, it is app
to
the user to ensure that, for a given topic, the producer name is
unique
across all Pulsar's clusters.
+ * `schema`:
+ Define the schema of the data that will be published by this
producer.
+ The schema will be used for two purposes:
+ - Validate the data format against the topic defined schema
+ - Perform serialization/deserialization between data and objects
+ An example for this parameter would be to pass
`schema=JsonSchema(MyRecordClass)`.
* `initial_sequence_id`:
Set the baseline for the sequence ids for messages
published by the producer. First message will be using
@@ -405,6 +422,7 @@ class Client:
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
+ _check_type(_schema.Schema, schema, 'schema')
_check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
_check_type(int, send_timeout_millis, 'send_timeout_millis')
_check_type(CompressionType, compression_type, 'compression_type')
@@ -436,12 +454,16 @@ class Client:
for k, v in properties.items():
conf.property(k, v)
+ conf.schema(schema.schema_info())
+
p = Producer()
p._producer = self._client.create_producer(topic, conf)
+ p._schema = schema
return p
def subscribe(self, topic, subscription_name,
consumer_type=ConsumerType.Exclusive,
+ schema=schema.BytesSchema(),
message_listener=None,
receiver_queue_size=1000,
max_total_receiver_queue_size_across_partitions=50000,
@@ -468,6 +490,8 @@ class Client:
* `consumer_type`:
Select the subscription type to be used when subscribing to the
topic.
+ * `schema`:
+ Define the schema of the data that will be received by this
consumer.
* `message_listener`:
Sets a message listener for the consumer. When the listener is set,
the application will receive messages through it. Calls to
@@ -515,6 +539,7 @@ class Client:
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
+ _check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type(int, max_total_receiver_queue_size_across_partitions,
'max_total_receiver_queue_size_across_partitions')
@@ -528,7 +553,7 @@ class Client:
conf.consumer_type(consumer_type)
conf.read_compacted(is_read_compacted)
if message_listener:
- conf.message_listener(message_listener)
+ conf.message_listener(_listener_wrapper(message_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
if consumer_name:
@@ -540,6 +565,8 @@ class Client:
for k, v in properties.items():
conf.property(k, v)
+ conf.schema(schema.schema_info())
+
c = Consumer()
if isinstance(topic, str):
# Single topic
@@ -554,10 +581,12 @@ class Client:
raise ValueError("Argument 'topic' is expected to be of a type
between (str, list, re.pattern)")
c._client = self
+ c._schema = schema
self._consumers.append(c)
return c
def create_reader(self, topic, start_message_id,
+ schema=schema.BytesSchema(),
reader_listener=None,
receiver_queue_size=1000,
reader_name=None,
@@ -587,6 +616,8 @@ class Client:
**Options**
+ * `schema`:
+ Define the schema of the data that will be received by this reader.
* `reader_listener`:
Sets a message listener for the reader. When the listener is set,
the application will receive messages through it. Calls to
@@ -610,21 +641,25 @@ class Client:
"""
_check_type(str, topic, 'topic')
_check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
+ _check_type(_schema.Schema, schema, 'schema')
_check_type(int, receiver_queue_size, 'receiver_queue_size')
_check_type_or_none(str, reader_name, 'reader_name')
_check_type_or_none(str, subscription_role_prefix,
'subscription_role_prefix')
conf = _pulsar.ReaderConfiguration()
if reader_listener:
- conf.reader_listener(reader_listener)
+ conf.reader_listener(_listener_wrapper(reader_listener, schema))
conf.receiver_queue_size(receiver_queue_size)
if reader_name:
conf.reader_name(reader_name)
if subscription_role_prefix:
conf.subscription_role_prefix(subscription_role_prefix)
+ conf.schema(schema.schema_info())
+
c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
c._client = self
+ c._schema = schema
self._consumers.append(c)
return c
@@ -781,7 +816,9 @@ class Producer:
def _build_msg(self, content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp):
- _check_type(bytes, content, 'content')
+ data = self._schema.encode(content)
+
+ _check_type(bytes, data, 'data')
_check_type_or_none(dict, properties, 'properties')
_check_type_or_none(str, partition_key, 'partition_key')
_check_type_or_none(int, sequence_id, 'sequence_id')
@@ -790,7 +827,7 @@ class Producer:
_check_type_or_none(int, event_timestamp, 'event_timestamp')
mb = _pulsar.MessageBuilder()
- mb.content(content)
+ mb.content(data)
if properties:
for k, v in properties.items():
mb.property(k, v)
@@ -850,10 +887,15 @@ class Consumer:
available within the timeout.
"""
if timeout_millis is None:
- return self._consumer.receive()
+ msg = self._consumer.receive()
else:
_check_type(int, timeout_millis, 'timeout_millis')
- return self._consumer.receive(timeout_millis)
+ msg = self._consumer.receive(timeout_millis)
+
+ m = Message()
+ m._message = msg
+ m._schema = self._schema
+ return m
def acknowledge(self, message):
"""
@@ -957,10 +999,15 @@ class Reader:
available within the timeout.
"""
if timeout_millis is None:
- return self._reader.read_next()
+ msg = self._reader.read_next()
else:
_check_type(int, timeout_millis, 'timeout_millis')
- return self._reader.read_next(timeout_millis)
+ msg = self._reader.read_next(timeout_millis)
+
+ m = Message()
+ m._message = msg
+ m._schema = self._schema
+ return m
def has_message_available(self):
"""
@@ -978,10 +1025,20 @@ class Reader:
def _check_type(var_type, var, name):
if not isinstance(var, var_type):
- raise ValueError("Argument %s is expected to be of type '%s'" % (name,
var_type.__name__))
+ raise ValueError("Argument %s is expected to be of type '%s' and not
'%s'"
+ % (name, var_type.__name__, type(var).__name__))
def _check_type_or_none(var_type, var, name):
if var is not None and not isinstance(var, var_type):
raise ValueError("Argument %s is expected to be either None or of type
'%s'"
% (name, var_type.__name__))
+
+
+def _listener_wrapper(listener, schema):
+ def wrapper(c, msg):
+ m = Message()
+ m._message = msg
+ m._schema = schema
+ listener(c, m)
+ return wrapper
diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py
b/pulsar-client-cpp/python/pulsar/schema/__init__.py
new file mode 100644
index 0000000..096e64a
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from .definition import *
+from .schema import *
+
+
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py
b/pulsar-client-cpp/python/pulsar/schema/definition.py
new file mode 100644
index 0000000..4658c03
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -0,0 +1,206 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from abc import abstractmethod, ABCMeta
+from enum import Enum, EnumMeta
+from collections import OrderedDict
+from six import with_metaclass
+
+
+def _check_record_or_field(x):
+ if (type(x) is type and not issubclass(x, Record)) \
+ and not isinstance(x, Field):
+ raise Exception('Argument ' + x + ' is not a Record or a Field')
+
+
+class RecordMeta(type):
+ def __new__(metacls, name, parents, dct):
+ if name != 'Record':
+ # Do not apply this logic to the base class itself
+ dct['_fields'] = RecordMeta._get_fields(dct)
+ return type.__new__(metacls, name, parents, dct)
+
+ @classmethod
+ def _get_fields(cls, dct):
+ # Build a set of valid fields for this record
+ fields = OrderedDict()
+ for name, value in dct.items():
+ if issubclass(type(value), EnumMeta):
+ # Wrap Python enums
+ value = _Enum(value)
+ elif type(value) == RecordMeta:
+ # We expect an instance of a record rather than the class
itself
+ value = value()
+
+ if isinstance(value, Record) or isinstance(value, Field):
+ fields[name] = value
+ return fields
+
+
+class Record(with_metaclass(RecordMeta, object)):
+
+ def __init__(self, *args, **kwargs):
+ if args:
+ # Only allow keyword args
+ raise TypeError('Non-keyword arguments not allowed when
initializing Records')
+
+ for k, value in self._fields.items():
+ if k in kwargs:
+ # Value was overridden at constructor
+ self.__setattr__(k, kwargs[k])
+ else:
+ # Set field to default value
+ self.__setattr__(k, value.default())
+
+ @classmethod
+ def schema(cls):
+ schema = {
+ 'name': str(cls.__name__),
+ 'type': 'record',
+ 'fields': []
+ }
+
+ for name, value in cls._fields.items():
+ schema['fields'].append({
+ 'name': name,
+ 'type': value.schema()
+ })
+ return schema
+
+ def __setattr__(self, key, value):
+ if key not in self._fields:
+ raise AttributeError('Cannot set undeclared field ' + key + ' on
record')
+ super(Record, self).__setattr__(key, value)
+
+ def __eq__(self, other):
+ for field in self._fields:
+ if self.__getattribute__(field) != other.__getattribute__(field):
+ return False
+ return True
+
+ def __str__(self):
+ return str(self.__dict__)
+
+
+class Field(object):
+ def __init__(self, default=None):
+ self._default = default
+
+ @abstractmethod
+ def type(self):
+ pass
+
+ def schema(self):
+ # For primitive types, the schema would just be the type itself
+ return self.type()
+
+ def default(self):
+ return self._default
+
+# All types
+
+
+class Null(Field):
+ def type(self):
+ return 'null'
+
+
+class Boolean(Field):
+ def type(self):
+ return 'boolean'
+
+
+class Integer(Field):
+ def type(self):
+ return 'int'
+
+
+class Long(Field):
+ def type(self):
+ return 'long'
+
+
+class Float(Field):
+ def type(self):
+ return 'float'
+
+
+class Double(Field):
+ def type(self):
+ return 'double'
+
+
+class Bytes(Field):
+ def type(self):
+ return 'bytes'
+
+
+class String(Field):
+ def type(self):
+ return 'string'
+
+
+# Complex types
+
+class _Enum(Field):
+ def __init__(self, enum_type):
+ if not issubclass(enum_type, Enum):
+ raise Exception(enum_type + " is not a valid Enum type")
+ self.enum_type = enum_type
+
+ def type(self):
+ return 'enum'
+
+ def schema(self):
+ return {
+ 'type': self.type(),
+ 'name': self.enum_type.__name__,
+ 'symbols': [x.name for x in self.enum_type]
+ }
+
+
+class Array(Field):
+ def __init__(self, array_type):
+ _check_record_or_field(array_type)
+ self.array_type = array_type
+
+ def type(self):
+ return 'array'
+
+ def schema(self):
+ return {
+ 'type': self.type(),
+ 'items': self.array_type.schema()
+ }
+
+
+class Map(Field):
+ def __init__(self, value_type):
+ _check_record_or_field(value_type)
+ self.value_type = value_type
+
+ def type(self):
+ return 'map'
+
+ def schema(self):
+ return {
+ 'type': self.type(),
+ 'values': self.value_type.schema()
+ }
+
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py
b/pulsar-client-cpp/python/pulsar/schema/schema.py
new file mode 100644
index 0000000..91250a8
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from abc import abstractmethod
+import json
+import fastavro
+import _pulsar
+import io
+
+
+class Schema(object):
+ def __init__(self, record_cls, schema_type, schema_definition,
schema_name):
+ self._record_cls = record_cls
+ self._schema_info = _pulsar.SchemaInfo(schema_type, schema_name,
+ json.dumps(schema_definition,
indent=True))
+
+ @abstractmethod
+ def encode(self, obj):
+ pass
+
+ @abstractmethod
+ def decode(self, data):
+ pass
+
+ def schema_info(self):
+ return self._schema_info
+
+ def _validate_object_type(self, obj):
+ if not isinstance(obj, self._record_cls):
+ raise TypeError('Invalid record obj of type ' + str(type(obj))
+ + ' - expected type is ' + str(self._record_cls))
+
+
+class BytesSchema(Schema):
+ def __init__(self):
+ super(BytesSchema, self).__init__(bytes, _pulsar.SchemaType.BYTES,
None, 'BYTES')
+
+ def encode(self, data):
+ self._validate_object_type(data)
+ return data
+
+ def decode(self, data):
+ return data
+
+
+class StringSchema(Schema):
+ def __init__(self):
+ super(StringSchema, self).__init__(str, _pulsar.SchemaType.STRING,
None, 'STRING')
+
+ def encode(self, obj):
+ self._validate_object_type(obj)
+ return obj.encode('utf-8')
+
+ def decode(self, data):
+ return data.decode('utf-8')
+
+
+class JsonSchema(Schema):
+
+ def __init__(self, record_cls):
+ super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
+ record_cls.schema(), 'JSON')
+
+ def encode(self, obj):
+ self._validate_object_type(obj)
+ return json.dumps(obj.__dict__, indent=True).encode('utf-8')
+
+ def decode(self, data):
+ return self._record_cls(**json.loads(data))
+
+
+class AvroSchema(Schema):
+ def __init__(self, record_cls):
+ super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO,
+ record_cls.schema(), 'AVRO')
+ self._schema = record_cls.schema()
+
+ def encode(self, obj):
+ self._validate_object_type(obj)
+ buffer = io.BytesIO()
+ fastavro.schemaless_writer(buffer, self._schema, obj.__dict__)
+ return buffer.getvalue()
+
+ def decode(self, data):
+ buffer = io.BytesIO(data)
+ d = fastavro.schemaless_reader(buffer, self._schema)
+ return self._record_cls(**d)
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index 3cb4829..9f2aa89 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -28,6 +28,8 @@ from pulsar import Client, MessageId, \
from _pulsar import ProducerConfiguration, ConsumerConfiguration
+from schema_test import *
+
try:
# For Python 3.0 and later
from urllib.request import urlopen, Request
diff --git a/pulsar-client-cpp/python/schema_test.py
b/pulsar-client-cpp/python/schema_test.py
new file mode 100755
index 0000000..5bbd077
--- /dev/null
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -0,0 +1,395 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, main
+import pulsar
+from pulsar.schema import *
+from enum import Enum
+import json
+
+
+class SchemaTest(TestCase):
+
+ serviceUrl = 'pulsar://localhost:6650'
+
+ def test_simple(self):
+ class Color(Enum):
+ red = 1
+ green = 2
+ blue = 3
+
+ class Example(Record):
+ a = String()
+ b = Integer()
+ c = Array(String())
+ d = Color
+ e = Boolean()
+ f = Float()
+ g = Double()
+ h = Bytes()
+ i = Map(String())
+
+ self.assertEqual(Example.schema(), {
+ "name": "Example",
+ "type": "record",
+ "fields": [
+ {"name": "a", "type": "string"},
+ {"name": "b", "type": "int"},
+ {"name": "c", "type": {
+ "type": "array",
+ "items": "string"}
+ },
+ {"name": "d",
+ "type": {
+ "type": "enum",
+ "name": "Color",
+ "symbols": ["red", "green", "blue"]}
+ },
+ {"name": "e", "type": "boolean"},
+ {"name": "f", "type": "float"},
+ {"name": "g", "type": "double"},
+ {"name": "h", "type": "bytes"},
+ {"name": "i", "type": {
+ "type": "map",
+ "values": "string"}
+ },
+ ]
+ })
+
+ def test_complex(self):
+ class MySubRecord(Record):
+ x = Integer()
+ y = Long()
+ z = String()
+
+ class Example(Record):
+ a = String()
+ sub = MySubRecord # Test with class
+ sub2 = MySubRecord() # Test with instance
+
+ self.assertEqual(Example.schema(), {
+ "name": "Example",
+ "type": "record",
+ "fields": [
+ {"name": "a", "type": "string"},
+ {"name": "sub",
+ "type": {
+ "name": "MySubRecord",
+ "type": "record",
+ "fields": [{"name": "x", "type": "int"},
+ {"name": "y", "type": "long"},
+ {"name": "z", "type": "string"}]
+ }
+ },
+ {"name": "sub2",
+ "type": {
+ "name": "MySubRecord",
+ "type": "record",
+ "fields": [{"name": "x", "type": "int"},
+ {"name": "y", "type": "long"},
+ {"name": "z", "type": "string"}]
+ }
+ }
+ ]
+ })
+
+ def test_invalid_enum(self):
+ class Color:
+ red = 1
+ green = 2
+ blue = 3
+
+ class InvalidEnum(Record):
+ a = Integer()
+ b = Color
+
+ # Enum will be ignored
+ self.assertEqual(InvalidEnum.schema(),
+ {'name': 'InvalidEnum', 'type': 'record', 'fields':
[{'name': 'a', 'type': 'int'}]})
+
+ def test_initialization(self):
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ r = Example(a=1, b=2)
+ self.assertEqual(r.a, 1)
+ self.assertEqual(r.b, 2)
+
+ r.b = 5
+
+ self.assertEqual(r.b, 5)
+
+ # Setting non-declared field should fail
+ try:
+ r.c = 3
+ self.fail('Should have failed')
+ except AttributeError:
+ # Expected
+ pass
+
+ try:
+ Record(a=1, c=8)
+ self.fail('Should have failed')
+ except AttributeError:
+ # Expected
+ pass
+
+ try:
+ Record('xyz', a=1, b=2)
+ self.fail('Should have failed')
+ except TypeError:
+ # Expected
+ pass
+
+ def test_serialize_json(self):
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ self.assertEqual(Example.schema(), {
+ "name": "Example",
+ "type": "record",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "int"},
+ ]
+ })
+
+ s = JsonSchema(Example)
+ r = Example(a=1, b=2)
+ data = s.encode(r)
+ self.assertEqual(json.loads(data), {'a': 1, 'b': 2})
+
+ r2 = s.decode(data)
+ self.assertEqual(r2.__class__.__name__, 'Example')
+ self.assertEqual(r2, r)
+
+ def test_serialize_avro(self):
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ self.assertEqual(Example.schema(), {
+ "name": "Example",
+ "type": "record",
+ "fields": [
+ {"name": "a", "type": "int"},
+ {"name": "b", "type": "int"},
+ ]
+ })
+
+ s = AvroSchema(Example)
+ r = Example(a=1, b=2)
+ data = s.encode(r)
+
+ r2 = s.decode(data)
+ self.assertEqual(r2.__class__.__name__, 'Example')
+ self.assertEqual(r2, r)
+
+ def test_serialize_wrong_types(self):
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ class Foo(Record):
+ x = Integer()
+ y = Integer()
+
+ s = JsonSchema(Example)
+ try:
+ data = s.encode(Foo(x=1, y=2))
+ self.fail('Should have failed')
+ except TypeError:
+ pass # expected
+
+ try:
+ data = s.encode('hello')
+ self.fail('Should have failed')
+ except TypeError:
+ pass # expected
+
+ def test_defaults(self):
+ class Example(Record):
+ a = Integer(default=5)
+ b = Integer()
+ c = String(default='hello')
+
+ r = Example()
+ self.assertEqual(r.a, 5)
+ self.assertEqual(r.b, None)
+ self.assertEqual(r.c, 'hello')
+
+ ####
+
+ def test_json_schema(self):
+
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ # Incompatible variation of the class
+ class BadExample(Record):
+ a = String()
+ b = Integer()
+
+ client = pulsar.Client(self.serviceUrl)
+ producer = client.create_producer(
+ 'my-json-python-topic',
+ schema=JsonSchema(Example))
+
+
+ # Validate that incompatible schema is rejected
+ try:
+ client.subscribe('my-json-python-topic', 'sub-1',
+ schema=JsonSchema(BadExample))
+ self.fail('Should have failed')
+ except Exception as e:
+ pass # Expected
+
+ try:
+ client.subscribe('my-json-python-topic', 'sub-1',
+ schema=StringSchema(BadExample))
+ self.fail('Should have failed')
+ except Exception as e:
+ pass # Expected
+
+ try:
+ client.subscribe('my-json-python-topic', 'sub-1',
+ schema=AvroSchema(BadExample))
+ self.fail('Should have failed')
+ except Exception as e:
+ pass # Expected
+
+ consumer = client.subscribe('my-json-python-topic', 'sub-1',
+ schema=JsonSchema(Example))
+
+ r = Example(a=1, b=2)
+ producer.send(r)
+
+ msg = consumer.receive()
+
+ self.assertEqual(r, msg.value())
+ client.close()
+
+ def test_string_schema(self):
+ client = pulsar.Client(self.serviceUrl)
+ producer = client.create_producer(
+ 'my-string-python-topic',
+ schema=StringSchema())
+
+
+ # Validate that incompatible schema is rejected
+ try:
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ client.create_producer('my-string-python-topic',
+ schema=JsonSchema(Example))
+ self.fail('Should have failed')
+ except Exception as e:
+ pass # Expected
+
+ consumer = client.subscribe('my-string-python-topic', 'sub-1',
+ schema=StringSchema())
+
+ producer.send("Hello")
+
+ msg = consumer.receive()
+
+ self.assertEqual("Hello", msg.value())
+ self.assertEqual(b"Hello", msg.data())
+ client.close()
+
+
+ def test_bytes_schema(self):
+ client = pulsar.Client(self.serviceUrl)
+ producer = client.create_producer(
+ 'my-bytes-python-topic',
+ schema=BytesSchema())
+
+
+ # Validate that incompatible schema is rejected
+ try:
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ client.create_producer('my-bytes-python-topic',
+ schema=JsonSchema(Example))
+ self.fail('Should have failed')
+ except Exception as e:
+ pass # Expected
+
+ consumer = client.subscribe('my-bytes-python-topic', 'sub-1',
+ schema=BytesSchema())
+
+ producer.send(b"Hello")
+
+ msg = consumer.receive()
+
+ self.assertEqual(b"Hello", msg.value())
+ client.close()
+
+ def test_avro_schema(self):
+
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ # Incompatible variation of the class
+ class BadExample(Record):
+ a = String()
+ b = Integer()
+
+ client = pulsar.Client(self.serviceUrl)
+ producer = client.create_producer(
+ 'my-avro-python-topic',
+ schema=AvroSchema(Example))
+
+ # Validate that incompatible schema is rejected
+ try:
+ client.subscribe('my-avro-python-topic', 'sub-1',
+ schema=AvroSchema(BadExample))
+ self.fail('Should have failed')
+ except Exception as e:
+ pass # Expected
+
+ try:
+ client.subscribe('my-avro-python-topic', 'sub-2',
+ schema=JsonSchema(Example))
+ self.fail('Should have failed')
+ except Exception as e:
+ pass # Expected
+
+ consumer = client.subscribe('my-avro-python-topic', 'sub-3',
+ schema=AvroSchema(Example))
+
+ r = Example(a=1, b=2)
+ producer.send(r)
+
+ msg = consumer.receive()
+
+ self.assertEqual(r, msg.value())
+ client.close()
+
+if __name__ == '__main__':
+ main()
diff --git a/pulsar-client-cpp/python/setup.py
b/pulsar-client-cpp/python/setup.py
index ce8d259..db165ae 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -20,6 +20,7 @@
from setuptools import setup
from distutils.core import Extension
import subprocess
+import sys
from distutils.command import build_ext
@@ -41,6 +42,10 @@ def get_version():
VERSION = get_version()
+if sys.version_info[0] == 2:
+ PY2 = True
+else:
+ PY2 = False
# This is a workaround to have setuptools to include
# the already compiled _pulsar.so library
@@ -57,10 +62,24 @@ class my_build_ext(build_ext.build_ext):
shutil.copyfile('_pulsar.so', self.get_ext_fullpath(ext.name))
+dependencies = [
+ 'grpcio', 'protobuf',
+ 'six',
+ 'fastavro',
+
+ # functions dependencies
+ "prometheus_client",
+ "ratelimit"
+]
+
+if PY2:
+ # Python 2 compat dependencies
+ dependencies += ['enum34']
+
setup(
name="pulsar-client",
version=VERSION,
- packages=['pulsar', 'pulsar.functions'],
+ packages=['pulsar', 'pulsar.schema', 'pulsar.functions'],
cmdclass={'build_ext': my_build_ext},
ext_modules=[Extension('_pulsar', [])],
@@ -68,11 +87,6 @@ setup(
author_email="[email protected]",
description="Apache Pulsar Python client library",
license="Apache License v2.0",
- url="http://pulsar.apache.org/",
- install_requires=[
- 'grpcio', 'protobuf',
- # functions dependencies
- "prometheus_client",
- "ratelimit"
- ],
+ url="https://pulsar.apache.org/",
+ install_requires=dependencies,
)
diff --git a/pulsar-client-cpp/python/src/config.cc
b/pulsar-client-cpp/python/src/config.cc
index c1bfeef..9b1d6d0 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -100,6 +100,8 @@ void export_config() {
class_<ProducerConfiguration>("ProducerConfiguration")
.def("producer_name", &ProducerConfiguration::getProducerName,
return_value_policy<copy_const_reference>())
.def("producer_name", &ProducerConfiguration::setProducerName,
return_self<>())
+ .def("schema", &ProducerConfiguration::getSchema,
return_value_policy<copy_const_reference>())
+ .def("schema", &ProducerConfiguration::setSchema, return_self<>())
.def("send_timeout_millis", &ProducerConfiguration::getSendTimeout)
.def("send_timeout_millis",
&ProducerConfiguration::setSendTimeout, return_self<>())
.def("initial_sequence_id",
&ProducerConfiguration::getInitialSequenceId)
@@ -128,6 +130,8 @@ void export_config() {
class_<ConsumerConfiguration>("ConsumerConfiguration")
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
.def("consumer_type", &ConsumerConfiguration::setConsumerType,
return_self<>())
+ .def("schema", &ConsumerConfiguration::getSchema,
return_value_policy<copy_const_reference>())
+ .def("schema", &ConsumerConfiguration::setSchema, return_self<>())
.def("message_listener",
&ConsumerConfiguration_setMessageListener, return_self<>())
.def("receiver_queue_size",
&ConsumerConfiguration::getReceiverQueueSize)
.def("receiver_queue_size",
&ConsumerConfiguration::setReceiverQueueSize)
@@ -148,6 +152,8 @@ void export_config() {
class_<ReaderConfiguration>("ReaderConfiguration")
.def("message_listener", &ReaderConfiguration_setReaderListener,
return_self<>())
+ .def("schema", &ReaderConfiguration::getSchema,
return_value_policy<copy_const_reference>())
+ .def("schema", &ReaderConfiguration::setSchema, return_self<>())
.def("receiver_queue_size",
&ReaderConfiguration::getReceiverQueueSize)
.def("receiver_queue_size",
&ReaderConfiguration::setReceiverQueueSize)
.def("reader_name", &ReaderConfiguration::getReaderName,
return_value_policy<copy_const_reference>())
diff --git a/pulsar-client-cpp/python/src/enums.cc
b/pulsar-client-cpp/python/src/enums.cc
index 704be25..3c0a6b9 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -76,4 +76,22 @@ void export_enums() {
.value("UnsupportedVersionError", ResultUnsupportedVersionError)
;
+ enum_<SchemaType>("SchemaType", "Supported schema types")
+ .value("NONE", NONE)
+ .value("STRING", STRING)
+ .value("INT8", INT8)
+ .value("INT16", INT16)
+ .value("INT32", INT32)
+ .value("INT64", INT64)
+ .value("FLOAT", FLOAT)
+ .value("DOUBLE", DOUBLE)
+ .value("BYTES", BYTES)
+ .value("JSON", JSON)
+ .value("PROTOBUF", PROTOBUF)
+ .value("AVRO", AVRO)
+ .value("AUTO_CONSUME", AUTO_CONSUME)
+ .value("AUTO_PUBLISH", AUTO_PUBLISH)
+ .value("KEY_VALUE", KEY_VALUE)
+ ;
+
}
diff --git a/pulsar-client-cpp/python/src/pulsar.cc
b/pulsar-client-cpp/python/src/pulsar.cc
index f3ceefd..b26a252 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/pulsar.cc
@@ -26,6 +26,7 @@ void export_reader();
void export_config();
void export_enums();
void export_authentication();
+void export_schema();
static void translateException(const PulsarException& ex) {
@@ -51,4 +52,5 @@ BOOST_PYTHON_MODULE(_pulsar)
export_config();
export_enums();
export_authentication();
+ export_schema();
}
diff --git a/pulsar-client-cpp/python/src/pulsar.cc
b/pulsar-client-cpp/python/src/schema.cc
similarity index 51%
copy from pulsar-client-cpp/python/src/pulsar.cc
copy to pulsar-client-cpp/python/src/schema.cc
index f3ceefd..397ec65 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/schema.cc
@@ -18,37 +18,13 @@
*/
#include "utils.h"
-void export_client();
-void export_message();
-void export_producer();
-void export_consumer();
-void export_reader();
-void export_config();
-void export_enums();
-void export_authentication();
+void export_schema() {
+ using namespace boost::python;
-
-static void translateException(const PulsarException& ex) {
- std::string err = "Pulsar error: ";
- err += strResult(ex._result);
-
- PyErr_SetString(PyExc_Exception, err.c_str());
-}
-
-BOOST_PYTHON_MODULE(_pulsar)
-{
- py::register_exception_translator<PulsarException>(translateException);
-
- // Initialize thread support so that we can grab the GIL mutex
- // from pulsar library threads
- PyEval_InitThreads();
-
- export_client();
- export_message();
- export_producer();
- export_consumer();
- export_reader();
- export_config();
- export_enums();
- export_authentication();
+ class_<SchemaInfo>("SchemaInfo",
+ init<SchemaType, const std::string& , const std::string&>())
+ .def("schema_type", &SchemaInfo::getSchemaType)
+ .def("name", &SchemaInfo::getName,
return_value_policy<copy_const_reference>())
+ .def("schema", &SchemaInfo::getSchema,
return_value_policy<copy_const_reference>())
+ ;
}