Hi,

How can I get an Avro Record schema to embed an Avro Record? Thank you for
any help you can provide.

Example:

class Embedded(Record):
m = String()
n = Integer()

class Example(Record):
a = String()
b = Integer()
c = Boolean()
d = Embedded()

he error
This is the error when running the simple example below. Apache Pulsar
2.5.2.

$ python3 test_pulsar.py
2020-07-02 03:14:02.792 INFO  ConnectionPool:85 | Created connection for
pulsar://localhost:6650
2020-07-02 03:14:02.792 INFO  ClientConnection:330 | [127.0.0.1:49098 ->
127.0.0.1:6650] Connected to broker
2020-07-02 03:14:02.796 INFO  HandlerBase:53 |
[persistent://public/default/my-topic, ] Getting connection from pool
2020-07-02 03:14:02.820 INFO  ProducerImpl:153 |
[persistent://public/default/my-topic, ] Created producer on broker [
127.0.0.1:49098 -> 127.0.0.1:6650]
2020-07-02 03:14:02.821 INFO  Client:88 | Subscribing on Topic :my-topic
2020-07-02 03:14:02.822 INFO  HandlerBase:53 |
[persistent://public/default/my-topic, my-subscription, 0] Getting
connection from pool
2020-07-02 03:14:02.839 INFO  ConsumerImpl:175 |
[persistent://public/default/my-topic, my-subscription, 0] Created consumer
on broker [127.0.0.1:49098 -> 127.0.0.1:6650]
Traceback (most recent call last):
  File "test_pulsar.py", line 26, in <module>
    producer.send(Example(a='Hello', b=1, c=True, d=Embedded(m='m', n=2)))
  File "/home/xxxx/.local/lib/python3.6/site-packages/pulsar/__init__.py",
line 841, in send
    replication_clusters, disable_replication, event_timestamp)
  File "/home/xxxx/.local/lib/python3.6/site-packages/pulsar/__init__.py",
line 914, in _build_msg
    data = self._schema.encode(content)
  File
"/home/xxxx/.local/lib/python3.6/site-packages/pulsar/schema/schema.py",
line 112, in encode
    fastavro.schemaless_writer(buffer, self._schema, m)
  File "fastavro/_write.pyx", line 635, in fastavro._write.schemaless_writer
  File "fastavro/_write.pyx", line 335, in fastavro._write.write_data
  File "fastavro/_write.pyx", line 285, in fastavro._write.write_record
  File "fastavro/_write.pyx", line 333, in fastavro._write.write_data
  File "fastavro/_write.pyx", line 249, in fastavro._write.write_union
ValueError: <__main__.Embedded object at 0x7f392dd28630> (type <class
'__main__.Embedded'>) do not match ['null', {'type': 'record', 'name':
'Embedded', 'fields': [{'name': 'm', 'type': ['null', 'string']}, {'name':
'n', 'type': ['null', 'int']}]}]
2020-07-02 03:14:02.927 INFO  ProducerImpl:476 | Producer -
[persistent://public/default/my-topic, standalone-2-53] , [batching  = off]
2020-07-02 03:14:02.927 INFO  ClientConnection:1349 | [127.0.0.1:49098 ->
127.0.0.1:6650] Connection closed
2020-07-02 03:14:02.927 INFO  ClientConnection:235 | [127.0.0.1:49098 ->
127.0.0.1:6650] Destroyed connection


import pulsar

from pulsar.schema import *

class Embedded(Record):
m = String()
n = Integer()

class Example(Record):
a = String()
b = Integer()
c = Boolean()
d = Embedded()

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer(
topic='my-topic',
schema=AvroSchema(Example) )

consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )

producer.send(Example(a='Hello', b=1, c=True, d=Embedded(m='m', n=2)))

while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={} d.m={} d.n={}".format(ex.a, ex.b,
ex.c, ex.d.m, ex.d.n))
consumer.acknowledge(msg)
except:
consumer.negative_acknowledge(msg)

Reply via email to