This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a36b0a  Created PIP 43: producer send message with different schema 
(markdown)
0a36b0a is described below

commit 0a36b0a839b76f37295bc11bed50ed3418c88b90
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Mon Sep 2 23:10:53 2019 -0700

    Created PIP 43: producer send message with different schema (markdown)
---
 ...-producer-send-message-with-different-schema.md | 92 ++++++++++++++++++++++
 1 file changed, 92 insertions(+)

diff --git a/PIP-43:-producer-send-message-with-different-schema.md 
b/PIP-43:-producer-send-message-with-different-schema.md
new file mode 100644
index 0000000..d36d453
--- /dev/null
+++ b/PIP-43:-producer-send-message-with-different-schema.md
@@ -0,0 +1,92 @@
+## Motivation
+
+For now, Pulsar producer can only produce messages of one type of schema which 
is determined by user when it is created, or by fecthing the latest version of 
schema from registry if `AUTO_PRODUCE_BYTES` type is specified. Schema, 
however, can be updated by external system after producer started, which would 
lead to inconsistency between messsage payload and schema version metadata. 
Also some senarios like replicating from kafka require a single producer for 
replicating messages of differ [...]
+
+Here proposing that messages can indicate the associated schema by itself with 
two parts of changes to make it clear.
+
+## Changes:Part-1
+
+For the part-1, here propose that producer supports to new message specified 
with schema, particularly, of same POJO type.
+
+### Interfaces
+
+For the `Producer<T>` interface, here propose a new method for `newMessage` to 
new a message builder with specified schema with the following signature:
+
+```
+TypedMessageBuilder<T> newMessage(Schema<T> schema);
+```
+
+where the parameterized type `T` is required to be same with producer.
+
+For `AutoProduceBytesSchema` specially, user SHOULD new message with actual 
schema wrapped by auto produce bytes schema. A static method MAY be provided by 
`Schema` interface with follwing signature:
+
+```
+static Schema<byte[]> AUTO_PRODUCE_BYTES(Schema<?> schema);
+```
+
+### Wire protocols
+
+To guarantee scenario that send message with brand new schema, we also propose 
a new Command to get schema version, or create one if NOT present.
+
+```
+message CommandGetOrCreateSchema {
+    required uint64 request_id = 1;
+    required string topic      = 2;
+    required Schema schema     = 3;
+}
+
+message CommandGetOrCreateSchemaResponse {
+    required uint64 request_id      = 1;
+    optional ServerError error_code = 2;
+    optional string error_message   = 3;
+
+    optional bytes schema_version   = 4;
+}
+```
+
+### Implementation
+
+#### Client
+The current `Schema schema` field of `Producer` would be used as default 
schema, when producer send messages without specifying schema explicily, with 
which the default schema would be associated.
+
+Producer SHOULD maintain a local map from schema to schema version, and check 
the schema version of the associated schema of message before send. If the 
schema can not be found, producer SHOULD try to register this schema to the 
registry and get the version of it, then insert the pair to the local map. Hash 
of `Schema` same with registry CAN be used as key of map.
+
+Producer SHOULD also attach the actual schema version to the message metadata 
as it is.
+
+For batch messages with single same metadata, only one schema version is 
allowed, so before adding one message into batch container, producer SHOULD 
check the schema version in this batch, and flush the batch if associated with 
different schema from the message and add it to another batch.
+
+To be seamless for producer not requires this feature, an option CAN be added 
to producer builder to enable this feature, and disable by default. When this 
feature is NOT enabled, the action of producer SHOULD keep as it is.
+
+#### Broker
+Server SHOULD handle the register schema command, and put it to registry, or 
just respond the existing version if present which SHOULD be built on top of 
compatibility check. The registry backend has alreay implemented this interface.
+
+#### Functions
+Functions MAY inherit the feature gate option and expose it to configuration, 
same for Sources.
+
+## Changes:Part-2
+
+For the part-2, here propose to allow producer to new message with different 
POJO type of schema. To be noted that, once one producer can send different 
POJOs, the parameterized type of message involved in methods of `Producer` 
SHOULD be changed in some way. The interceptor mechanism would also be affected.
+
+### Interfaces
+
+For the `Producer<T>` interface, here propose to enhance the method propsed in 
part-1 to accept arbitrary inner type with the following signature:
+
+```
+<V> TypedMessageBuilder<V> newMessage(Schema<V> schema);
+```
+where the parameterized type `T` and `V` is NOT required to be same.
+
+For `ProducerInterceptor<T>` interface, provide a method to indicate whether 
the message is supported by the interceptor instance:
+```
+default bool eligible(Message message) {
+    return true;
+}
+```
+which is essential especially when producer can send different type of 
messages, notes that the message parameter is `Message` raw type, not required 
to has same parameterized type with interceptor.
+
+### Implementations
+
+#### Client
+The only thing that needs to be pointed out is when the parameterized type of 
producer and schema/message conflict, the message parameter is allowed to 
declare with different parameterized type.
+
+For `ProducerInterceptors`, check if eligible before invoking each interceptor.

Reply via email to