sijie commented on a change in pull request #3876: Add the multi version schema 
support
URL: https://github.com/apache/pulsar/pull/3876#discussion_r273291315
 
 

 ##########
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
 ##########
 @@ -38,26 +50,65 @@
  * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
  * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
  */
-abstract class StructSchema<T> implements Schema<T> {
+public abstract class StructSchema<T> implements Schema<T> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(StructSchema.class);
 
-    protected final org.apache.avro.Schema schema;
+    public final org.apache.avro.Schema schema;
     protected final SchemaInfo schemaInfo;
+    protected final SchemaReader<T> reader;
+    protected final SchemaWriter<T> writer;
+    private boolean supportSchemaVersioning;
+    protected SchemaProvider schemaProvider;
+    private final LoadingCache<byte[], SchemaReader<T>> readerCache = 
CacheBuilder.newBuilder().maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<byte[], SchemaReader<T>>() {
+                @Override
+                public SchemaReader<T> load(byte[] schemaVersion) throws 
Exception {
+                    return loadReader(schemaVersion);
+                }
+            });
 
     protected StructSchema(SchemaType schemaType,
                            org.apache.avro.Schema schema,
-                           Map<String, String> properties) {
+                           SchemaDefinition schemaDefinition,
+                           SchemaWriter<T> writer,
+                           SchemaReader<T> reader) {
         this.schema = schema;
         this.schemaInfo = new SchemaInfo();
         this.schemaInfo.setName("");
         this.schemaInfo.setType(schemaType);
         this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
-        this.schemaInfo.setProperties(properties);
+        this.schemaInfo.setProperties(schemaDefinition.getProperties());
+        this.supportSchemaVersioning = 
schemaDefinition.getSupportSchemaVersioning();
+        this.writer = writer;
+        this.reader = reader;
     }
 
-    protected org.apache.avro.Schema getAvroSchema() {
+    public org.apache.avro.Schema getAvroSchema() {
         return schema;
     }
 
+    @Override
+    public byte[] encode(T message) {
+        return writer.write(message);
+    }
+
+    @Override
+    public T decode(byte[] bytes) {
+        return reader.read(bytes);
+    }
+
+    @Override
+    public T decode(byte[] bytes, byte[] schemaVersion) {
+        try {
+            return readerCache.get(schemaVersion).read(bytes);
+        } catch (ExecutionException e) {
+            LOG.error("Can't get generic schema for topic {} schema version 
{}",
+                    ((MultiVersionGenericSchemaProvider) 
schemaProvider).getTopic().toString(), new String(schemaVersion, 
StandardCharsets.UTF_8), e);
 
 Review comment:
   I would prefer checking if schemaProvider is an instance of 
MultiVersionGenericSchemaProvider before casting it directly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to