sijie commented on a change in pull request #3876: Add the multi version schema support URL: https://github.com/apache/pulsar/pull/3876#discussion_r275129946
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java ########## @@ -38,39 +50,98 @@ * {@link org.apache.pulsar.common.schema.SchemaType#JSON}, * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}. */ -abstract class StructSchema<T> implements Schema<T> { +public abstract class StructSchema<T> implements Schema<T> { + + protected static final Logger LOG = LoggerFactory.getLogger(StructSchema.class); - protected final org.apache.avro.Schema schema; + public final org.apache.avro.Schema schema; protected final SchemaInfo schemaInfo; + protected SchemaReader<T> reader; + protected SchemaWriter<T> writer; + protected SchemaInfoProvider schemaInfoProvider; + private final LoadingCache<byte[], SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaReader<T>>() { + @Override + public SchemaReader<T> load(byte[] schemaVersion) { + return loadReader(schemaVersion); + } + }); - protected StructSchema(SchemaType schemaType, - org.apache.avro.Schema schema, - Map<String, String> properties) { - this.schema = schema; - this.schemaInfo = new SchemaInfo(); - this.schemaInfo.setName(""); - this.schemaInfo.setType(schemaType); - this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8)); - this.schemaInfo.setProperties(properties); + protected StructSchema(SchemaInfo schemaInfo) { + this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)); + this.schemaInfo = schemaInfo; } - protected org.apache.avro.Schema getAvroSchema() { + public org.apache.avro.Schema getAvroSchema() { return schema; } + @Override + public byte[] encode(T message) { + return writer.write(message); + } + + @Override + public T decode(byte[] bytes) { + return reader.read(bytes); + } + + @Override + public T decode(byte[] bytes, byte[] schemaVersion) { + try { + return readerCache.get(schemaVersion).read(bytes); + } catch (ExecutionException e) { + LOG.error("Can't get generic schema for topic {} schema version {}", + schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e); + return null; + } + } + @Override public SchemaInfo getSchemaInfo() { return this.schemaInfo; } protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schemaDefinition) { Class pojo = schemaDefinition.getPojo(); - return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo); + if (pojo != null) { + return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo); + } else if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) { + return parseAvroSchema(schemaDefinition.getJsonDef()); + } else { + throw new RuntimeException("Schema definition must specify pojo class or schema json definition"); + } + } + + public static org.apache.avro.Schema parseAvroSchema(String schemaJson) { Review comment: nit: protected ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services