eolivelli commented on a change in pull request #9481:
URL: https://github.com/apache/pulsar/pull/9481#discussion_r572676915



##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
##########
@@ -196,6 +197,9 @@ private static boolean isProtobufClass(Class<?> pojoClazz) {
         String schemaTypeOrClassName = conf.getSchemaType();
         if (StringUtils.isEmpty(schemaTypeOrClassName) || 
DEFAULT_SERDE.equals(schemaTypeOrClassName)) {
             // No preferred schema was provided, auto-discover schema or 
fallback to defaults
+            if (!input && clazz.equals(GenericRecord.class)) {
+                return new AutoProduceBytesSchema();
+            }

Review comment:
       @congbobo184 
   This method is called by `PulsarSink` in `initializeSchema() `method via 
`TopicSchema#getSchema`
   The PulsarSink is the entity that actually write to Pulsar.
   When you create a Source Pulsar IO creates a PulsarSink to write to the 
destination topic. 
   
   When you start the Source you are going to use this special 
`AutoProduceBytesSchema` (it was pre-existing, I did not add it).
   
   Initially the Source does not enforce a Schema on the topic (we achieve this 
with `AutoProduceBytesSchema`).
   
   When the Source passes a Record to the Pulsar runtime, the PulsarSink picks 
up the Schema (using Record#getSchema) and sets the schema properly.
   Therefore when the schema changes (in a compatible way) the runtime 
automatically updates the schema.
   
   In fact when the Source starts you cannot know the schema, because the 
schema is generated dynamically by the Source itself.
   
   
   
    
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to