gaoran10 commented on code in PR #24328:
URL: https://github.com/apache/pulsar/pull/24328#discussion_r2153516617


##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party 
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema 
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with 
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage 
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema 
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an 
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for 
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema 
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer 
subscription commands.
+- Provide a way to build external schema management system clients and 
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging 
protocols,
+and making Pulsar client have the ability to leverage external schema registry 
service to manage schema.
+The external schema registry is responsible for managing the schema, the 
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer 
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface 
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the 
customized schemas can register schema or get schema with these methods.
+The encoded messaging depends on the external schema system, Pulsar broker 
just treats the message as bytes data, and it won't change the message version 
of message metadata.
+Unlike Pulsar using schema version to identify the schema, some external 
schema system use the schema ID to identify the schema,
+if using external schema system the Pulsar message metadata will not maintain 
schema ID, the customized decoding method can try to retrieve the schema ID 
from the encoded data.
+
+Example usage
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+    Map<String, String> srConfig = new HashedMap<>();
+    srConfig.put("schema.registry.url", "http://localhost:8001";);
+
+    PulsarClient client = PulsarClient.builder()
+            .serviceUrl("pulsar://localhost:6650")
+            .schemaInfoProviderFactory(new 
KafkaSchemaInfoProviderFactory(srConfig))
+            .build();
+
+    String topic = "t1";
+    Schema<User> schema = KafkaSchemas.JSON(User.class);
+
+    Producer<User> producer = client.newProducer(schema)
+            .topic(topic)
+            .create();
+
+    Consumer<User> consumer = client.newConsumer(schema)
+            .topic(topic)
+            .subscriptionName("sub")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+    for (int i = 0; i < 10; i++) {
+        producer.send(new User("name-" + i, 10 + i));
+    }
+
+    for (int i = 0; i < 10; i++) {
+        Message<User> message = consumer.receive();
+        consumer.acknowledge(message);
+    }
+
+    client.close();
+}
+```
+
+Messaging protocols changes
+
+Ignore schema info while creating Producer, ignore register schema before 
sending messages, the schema management 
+```java
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, 
ConnectionHandler.Connection {
+
+    @Override
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+        // ...
+
+        if (schema != null && schema.getSchemaInfoProvider() != null && 
schema.getSchemaInfoProvider().isExternal()) {
+            try {
+                // set schemaInfo to null if the schema info provider is 
external
+                schemaInfo = null;
+            } catch (Exception e) {
+                log.error("Failed to register schema.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        // send create producer request
+    }
+
+    private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, 
SendCallback callback, long expectedCnxEpoch) {
+        // ...
+
+        if (schema.getSchemaInfoProvider().isExternal()) {
+            // don't register schema if external schema registry service, the 
register method can be integrated in the messaging encode method.
+            return;
+        }
+        // getOrCreateSchemaAsync
+    }
+
+}
+```
+
+Ignore schema info while adding consumer subscription.
+```java
+public class ConsumerImpl<T> extends ConsumerBase<T> implements 
ConnectionHandler.Connection {
+
+    @Override
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+        // ...
+
+        // only external schema info provider need to register schema
+        if (schema.getSchemaInfoProvider() != null && 
schema.getSchemaInfoProvider().isExternal()) {
+            try {
+                // don't set schema info for schema registry schema
+                si = null;
+            } catch (Exception e) {
+                log.error("Failed to register schema.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+}
+```
+
+## Public-facing Changes
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provides necessary params for connecting to the 
external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer 
subscribe command will treat the schema as bytes schema, the broker will ignore 
schema validation.
+```java
+public interface SchemaInfoProvider {
+
+    /**
+      * Returns the configs of the schema registry service, such as URL, 
authentication params.
+      */
+    default Map<String, String> getConfigs() {
+        return Collections.emptyMap();
+    }
+
+    /**
+      * It's used to determine whether the SchemaInfoProvider is external or 
not.
+      */
+    default boolean isExternal() {
+        return false;
+    }
+
+}
+```
+
+Add a new interface `SchemaInfoProviderFactory`, it's used to initialize 
`SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
+```java
+public interface SchemaInfoProviderFactory {
+
+    SchemaInfoProvider of(String topic);
+
+}
+```
+
+The client build supports setting the `SchemaInfoProviderFactory`.
+```java
+public interface ClientBuilder extends Serializable, Cloneable {
+
+    ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory 
schemaInfoProviderFactory);
+
+}
+```
+
+The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+```java
+public class ClientConfigurationData implements Serializable, Cloneable {
+
+    @JsonIgnore
+    private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+
+}
+```
+
+The customized schema can get the `SchemaInfoProvider` and retrieve the 
configs from it.
+```java
+public interface Schema {
+
+    /**
+     * When setting schema info provider for schema, the schema can retrieve 
the configs.
+     */
+    default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+    }
+
+    /**
+      * Returns the schema info provider.
+      *
+      * @return a {@code SchemaInfoProvider} representing the schema info 
provider
+      */
+    default SchemaInfoProvider getSchemaInfoProvider() {
+        return null;
+    }
+
+}
+```
+
+# Pulsar Function

Review Comment:
   Introduce a new schema type `EXTERNAL`; schemas that use the external schema 
registry should use this schema type. Please check the schema type section.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to