sijie commented on a change in pull request #3876: Add the multi version schema support URL: https://github.com/apache/pulsar/pull/3876#discussion_r273291315
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java ########## @@ -38,26 +50,65 @@ * {@link org.apache.pulsar.common.schema.SchemaType#JSON}, * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}. */ -abstract class StructSchema<T> implements Schema<T> { +public abstract class StructSchema<T> implements Schema<T> { + + protected static final Logger LOG = LoggerFactory.getLogger(StructSchema.class); - protected final org.apache.avro.Schema schema; + public final org.apache.avro.Schema schema; protected final SchemaInfo schemaInfo; + protected final SchemaReader<T> reader; + protected final SchemaWriter<T> writer; + private boolean supportSchemaVersioning; + protected SchemaProvider schemaProvider; + private final LoadingCache<byte[], SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaReader<T>>() { + @Override + public SchemaReader<T> load(byte[] schemaVersion) throws Exception { + return loadReader(schemaVersion); + } + }); protected StructSchema(SchemaType schemaType, org.apache.avro.Schema schema, - Map<String, String> properties) { + SchemaDefinition schemaDefinition, + SchemaWriter<T> writer, + SchemaReader<T> reader) { this.schema = schema; this.schemaInfo = new SchemaInfo(); this.schemaInfo.setName(""); this.schemaInfo.setType(schemaType); this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8)); - this.schemaInfo.setProperties(properties); + this.schemaInfo.setProperties(schemaDefinition.getProperties()); + this.supportSchemaVersioning = schemaDefinition.getSupportSchemaVersioning(); + this.writer = writer; + this.reader = reader; } - protected org.apache.avro.Schema getAvroSchema() { + public org.apache.avro.Schema getAvroSchema() { return schema; } + @Override + public byte[] encode(T message) { + return writer.write(message); + } + + @Override + public T decode(byte[] bytes) { + return reader.read(bytes); + } + + @Override + public T decode(byte[] bytes, byte[] schemaVersion) { + try { + return readerCache.get(schemaVersion).read(bytes); + } catch (ExecutionException e) { + LOG.error("Can't get generic schema for topic {} schema version {}", + ((MultiVersionGenericSchemaProvider) schemaProvider).getTopic().toString(), new String(schemaVersion, StandardCharsets.UTF_8), e); Review comment: I would prefer checking if schemaProvider is an instance of MultiVersionGenericSchemaProvider before casting it directly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services