eolivelli commented on a change in pull request #9481: URL: https://github.com/apache/pulsar/pull/9481#discussion_r572676915
########## File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java ########## @@ -196,6 +197,9 @@ private static boolean isProtobufClass(Class<?> pojoClazz) { String schemaTypeOrClassName = conf.getSchemaType(); if (StringUtils.isEmpty(schemaTypeOrClassName) || DEFAULT_SERDE.equals(schemaTypeOrClassName)) { // No preferred schema was provided, auto-discover schema or fallback to defaults + if (!input && clazz.equals(GenericRecord.class)) { + return new AutoProduceBytesSchema(); + } Review comment: @congbobo184 This method is called by `PulsarSink` in `initializeSchema() `method via `TopicSchema#getSchema` The PulsarSink is the entity that actually write to Pulsar. When you create a Source Pulsar IO creates a PulsarSink to write to the destination topic. When you start the Source you are going to use this special `AutoProduceBytesSchema` (it was pre-existing, I did not add it). Initially the Source does not enforce a Schema on the topic (we achieve this with `AutoProduceBytesSchema`). When the Source passes a Record to the Pulsar runtime, the PulsarSink picks up the Schema (using Record#getSchema) and sets the schema properly. Therefore when the schema changes (in a compatible way) the runtime automatically updates the schema. In fact when the Source starts you cannot know the schema, because the schema is generated dynamically by the Source itself. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org