gaoran10 commented on code in PR #24328: URL: https://github.com/apache/pulsar/pull/24328#discussion_r2153516617
########## pip/pip-420.md: ########## @@ -0,0 +1,253 @@ +# PIP-420: Provide ability for Pulsar clients to integrate with third-party schema registry service + +# Background knowledge + +Schema is an important feature for messaging systems. Pulsar integrates schema manager into the Pulsar broker. +The current implementation in Pulsar clients couples schema management with some protocols (creating producer, adding consumer subscription). +This increases the Pulsar protocol complexity and users can’t leverage third-party schema registry services in Pulsar client. + +# Motivation + +The Pulsar client is better has the ability to access third-party schema registry service to manage the schema (register schema, +get schema, validate schema, etc.). The schema registry service can be an independent service, if using third-party schema registry service, +the Pulsar broker doesn't need to care about the schema of the messages. + +# Goals + +## In Scope + +- Provide an ability to leverage third-party schema registry service for Pulsar client. + +## Out Scope + +This PIP will not include the implementation for accessing third-party schema system. + +# High Level Design + +This PIP is just for providing some abilities. + +- Decouple schema management from creating producer or add consumer subscription commands. +- Provide a way to build external schema management system clients and integrate with Pulsar clients. + +# Detailed Design + +## Design & Implementation Details + +This PIP's target is decoupling schema management from Pulsar messaging protocols, +and making Pulsar client have the ability to leverage external schema registry service to manage schema. +The external schema registry is responsible for managing the schema, the broker doesn't care about the messaging schema. +The Pulsar client should ignore the schema information when creating producer and adding consumer subscription. + +Users can implement the `SchemaInfoProvider` interface and `Schema` interface to access external schema registry service. +The `Schema` interface has mainly two methods `encode` and `decode`, the customized schemas can register schema or get schema with these methods. +The encoded messaging depends on the external schema system, Pulsar broker just treats the message as bytes data, and it won't change the message version of message metadata. +Unlike Pulsar using schema version to identify the schema, some external schema system use the schema ID to identify the schema, +if using external schema system the Pulsar message metadata will not maintain schema ID, the customized decoding method can try to retrieve the schema ID from the encoded data. + +Example usage +```java +public void workWithExternalSchemaRegistry() throws Exception { + Map<String, String> srConfig = new HashedMap<>(); + srConfig.put("schema.registry.url", "http://localhost:8001"); + + PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .schemaInfoProviderFactory(new KafkaSchemaInfoProviderFactory(srConfig)) + .build(); + + String topic = "t1"; + Schema<User> schema = KafkaSchemas.JSON(User.class); + + Producer<User> producer = client.newProducer(schema) + .topic(topic) + .create(); + + Consumer<User> consumer = client.newConsumer(schema) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send(new User("name-" + i, 10 + i)); + } + + for (int i = 0; i < 10; i++) { + Message<User> message = consumer.receive(); + consumer.acknowledge(message); + } + + client.close(); +} +``` + +Messaging protocols changes + +Ignore schema info while creating Producer, ignore register schema before sending messages, the schema management +```java +public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, ConnectionHandler.Connection { + + @Override + public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) { + // ... + + if (schema != null && schema.getSchemaInfoProvider() != null && schema.getSchemaInfoProvider().isExternal()) { + try { + // set schemaInfo to null if the schema info provider is external + schemaInfo = null; + } catch (Exception e) { + log.error("Failed to register schema.", e); + throw new RuntimeException(e); + } + } + + // send create producer request + } + + private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback, long expectedCnxEpoch) { + // ... + + if (schema.getSchemaInfoProvider().isExternal()) { + // don't register schema if external schema registry service, the register method can be integrated in the messaging encode method. + return; + } + // getOrCreateSchemaAsync + } + +} +``` + +Ignore schema info while adding consumer subscription. +```java +public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection { + + @Override + public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) { + // ... + + // only external schema info provider need to register schema + if (schema.getSchemaInfoProvider() != null && schema.getSchemaInfoProvider().isExternal()) { + try { + // don't set schema info for schema registry schema + si = null; + } catch (Exception e) { + log.error("Failed to register schema.", e); + throw new RuntimeException(e); + } + } + + } + +} +``` + +## Public-facing Changes + +Add new methods for `SchemaInfoProvider` interface. +The `SchemaInfoProvider` provides necessary params for connecting to the external schema registry service with the method `getConfigs`. +If the schema info provider is external, the new producer command, consumer subscribe command will treat the schema as bytes schema, the broker will ignore schema validation. +```java +public interface SchemaInfoProvider { + + /** + * Returns the configs of the schema registry service, such as URL, authentication params. + */ + default Map<String, String> getConfigs() { + return Collections.emptyMap(); + } + + /** + * It's used to determine whether the SchemaInfoProvider is external or not. + */ + default boolean isExternal() { + return false; + } + +} +``` + +Add a new interface `SchemaInfoProviderFactory`, it's used to initialize `SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`. +```java +public interface SchemaInfoProviderFactory { + + SchemaInfoProvider of(String topic); + +} +``` + +The client build supports setting the `SchemaInfoProviderFactory`. +```java +public interface ClientBuilder extends Serializable, Cloneable { + + ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory schemaInfoProviderFactory); + +} +``` + +The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`. +```java +public class ClientConfigurationData implements Serializable, Cloneable { + + @JsonIgnore + private transient SchemaInfoProviderFactory schemaInfoProviderFactory; + +} +``` + +The customized schema can get the `SchemaInfoProvider` and retrieve the configs from it. +```java +public interface Schema { + + /** + * When setting schema info provider for schema, the schema can retrieve the configs. + */ + default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) { + } + + /** + * Returns the schema info provider. + * + * @return a {@code SchemaInfoProvider} representing the schema info provider + */ + default SchemaInfoProvider getSchemaInfoProvider() { + return null; + } + +} +``` + +# Pulsar Function Review Comment: Introduce a new schema type `EXTERNAL`; schemas that use the external schema registry should use this schema type. Please check the schema type section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org