momo-jun commented on code in PR #18434:
URL: https://github.com/apache/pulsar/pull/18434#discussion_r1045725158
##########
site2/docs/schema-get-started.md:
##########
@@ -228,88 +329,350 @@ Producer<SensorReading> producer =
client.newProducer(AvroSchema.of(SensorReadin
.create();
```
-### Avro-based schema using Java
-
-The following schema formats are currently available for Java:
+</TabItem>
+<TabItem value="C++">
-* No schema or the byte array schema (which can be applied using
`Schema.BYTES`):
+ ```cpp
+ // Send messages
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.createProducer("topic-avro", producerConf, producer);
- ```java
- Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
- .topic("some-raw-bytes-topic")
- .create();
+ // Receive messages
+ static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+ ConsumerConfiguration consumerConf;
+ Consumer consumer;
+ consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
```
- Or, equivalently:
+</TabItem>
+<TabItem value="Python">
- ```java
- Producer<byte[]> bytesProducer = client.newProducer()
- .topic("some-raw-bytes-topic")
- .create();
- ```
+You can declare an `AvroSchema` using Python through one of the following
methods.
-* `String` for normal UTF-8-encoded string data. Apply the schema using
`Schema.STRING`:
+**Method 1: Record**
- ```java
- Producer<String> stringProducer = client.newProducer(Schema.STRING)
- .topic("some-string-topic")
- .create();
- ```
+Declare an `AvroSchema` by passing a class that inherits from
`pulsar.schema.Record` and defines the fields as class variables.
-* Create JSON schemas for POJOs using `Schema.JSON`. The following is an
example.
+```python
+class Example(Record):
+ a = Integer()
+ b = Integer()
- ```java
- Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
- .topic("some-pojo-topic")
- .create();
- ```
+producer = client.create_producer(
+ 'avro-schema-topic',
+ schema=AvroSchema(Example))
+r = Example(a=1, b=2)
+producer.send(r)
-* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example
shows how to create the Protobuf schema and use it to instantiate a new
producer:
+consumer = client.subscribe(
+ 'avro-schema-topic',
+ 'sub',
+ schema=AvroSchema(Example))
+msg = consumer.receive()
+e = msg.value()
+```
- ```java
- Producer<MyProtobuf> protobufProducer =
client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
- .topic("some-protobuf-topic")
- .create();
- ```
+**Method 2: JSON definition**
+
+1. Declare an `AvroSchema` using JSON. In this case, Avro schemas are defined
using JSON.
+
+ Below is an example of `AvroSchema` defined using a JSON file
(`company.avsc`).
+
+ ```json
+ {
+ "doc": "this is doc",
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "Company",
+ "fields": [
+ {"name": "name", "type": ["null", "string"]},
+ {"name": "address", "type": ["null", "string"]},
+ {"name": "employees", "type": ["null", {"type": "array", "items": {
+ "type": "record",
+ "name": "Employee",
+ "fields": [
+ {"name": "name", "type": ["null", "string"]},
+ {"name": "age", "type": ["null", "int"]}
+ ]
+ }}]},
+ {"name": "labels", "type": ["null", {"type": "map", "values":
"string"}]}
+ ]
+ }
+ ```
-* Define Avro schemas with `Schema.AVRO`. The following code snippet
demonstrates how to create and use Avro schema.
+2. Load a schema definition from a file by using
[`avro.schema`](https://avro.apache.org/docs/current/getting-started-python/)
or
[`fastavro.schema`](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema).
+
+ If you use the [JSON definition](#method-2-json-definition) method to
declare an `AvroSchema`, you need to:
+ - Use [Python dict](https://developers.google.com/edu/python/dict-files) to
produce and consume messages, which is different from using the
[Record](#method-1-record) method.
+ - Set the value of the `_record_cls` parameter to `None` when generating an
`AvroSchema` object.
+
+ **Example**
+
+ ```python
+ from fastavro.schema import load_schema
+ from pulsar.schema import *
+ schema_definition = load_schema("examples/company.avsc")
+ avro_schema = AvroSchema(None, schema_definition=schema_definition)
+ producer = client.create_producer(
+ topic=topic,
+ schema=avro_schema)
+ consumer = client.subscribe(topic, 'test', schema=avro_schema)
+ company = {
+ "name": "company-name" + str(i),
+ "address": 'xxx road xxx street ' + str(i),
+ "employees": [
+ {"name": "user" + str(i), "age": 20 + i},
+ {"name": "user" + str(i), "age": 30 + i},
+ {"name": "user" + str(i), "age": 35 + i},
+ ],
+ "labels": {
+ "industry": "software" + str(i),
+ "scale": ">100",
+ "funds": "1000000.0"
+ }
+ }
+ producer.send(company)
+ msg = consumer.receive()
+ # Users could get a dict object by `value()` method.
+ msg.value()
+ ```
- ```java
- Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
- .topic("some-avro-topic")
- .create();
- ```
+</TabItem>
+<TabItem value="Go">
+Suppose you have an `avroExampleStruct` class as follows, and you'd like to
transmit it over a Pulsar topic.
-### Avro schema using C++
+```go
+ type avroExampleStruct struct {
+ ID int
+ Name string
+}
+```
-- The following example shows how to create a producer with an Avro schema.
+1. Add an `avroSchemaDef` like this:
- ```cpp
- static const std::string exampleSchema =
- "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
- Producer producer;
- ProducerConfiguration producerConf;
- producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
- client.createProducer("topic-avro", producerConf, producer);
- ```
+ ```go
+ var (
+ exampleSchemaDef =
"{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+
"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+ )
+ ```
-- The following example shows how to create a consumer with an Avro schema.
+2. Create producer and consumer to send/receive messages:
+
+ ```go
+ //Create producer and send message
+ producer, err := client.CreateProducer(pulsar.ProducerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewAvroSchema(exampleSchemaDef, nil),
+ })
+
+ msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+ Value: avroExampleStruct{
+ ID: 10,
+ Name: "avroExampleStruct",
+ },
+ })
+
+ //Create Consumer and receive message
+ consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+ Topic: "my-topic",
+ Schema: pulsar.NewAvroSchema(exampleSchemaDef, nil),
+ SubscriptionName: "my-sub",
+ Type: pulsar.Shared,
+ })
+ message, err := consumer.Receive(context.Background())
+ ```
- ```cpp
- static const std::string exampleSchema =
- "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-
"\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
- ConsumerConfiguration consumerConf;
- Consumer consumer;
- consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
- client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
- ```
+</TabItem>
+</Tabs>
+````
+
+### JSON
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+ defaultValue="Java"
+
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+Similar to using `AvroSchema`, you can declare a `JsonSchema` by passing a
class. The only difference is to use `JsonSchema` instead of `AvroSchema` when
defining the schema type, as shown below. For how to use `AvroSchema` via
record, see [Method 1 - Record](#method-1-record).
+
+```java
+static class SchemaDema {
+ public String name;
+ public int age;
+}
+
+Producer<SchemaDema> producer =
pulsarClient.newProducer(Schema.JSON(SchemaDema.class))
+ .topic("my-topic")
+ .create();
+Consumer<SchemaDema> consumer =
pulsarClient.newConsumer(Schema.JSON(SchemaDema.class))
+ .topic("my-topic")
+ .subscriptionName("my-sub")
+ .subscribe();
+
+SchemaDema schemaDema = new SchemaDema();
+schemaDema.name = "puslar";
+schemaDema.age = 20;
+producer.newMessage().value(schemaDema).send();
+
+Message<SchemaDema> message = consumer.receive(5, TimeUnit.SECONDS);
Review Comment:
Good question. @liangyepianzhou can you pls take a look? I'm not sure
whether it is intentional.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]