sijie commented on a change in pull request #3876: Add the multi version schema 
support
URL: https://github.com/apache/pulsar/pull/3876#discussion_r275129946
 
 

 ##########
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
 ##########
 @@ -38,39 +50,98 @@
  * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
  * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
  */
-abstract class StructSchema<T> implements Schema<T> {
+public abstract class StructSchema<T> implements Schema<T> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(StructSchema.class);
 
-    protected final org.apache.avro.Schema schema;
+    public final org.apache.avro.Schema schema;
     protected final SchemaInfo schemaInfo;
+    protected SchemaReader<T> reader;
+    protected SchemaWriter<T> writer;
+    protected SchemaInfoProvider schemaInfoProvider;
+    private final LoadingCache<byte[], SchemaReader<T>> readerCache = 
CacheBuilder.newBuilder().maximumSize(100000)
+            .expireAfterAccess(30, TimeUnit.MINUTES).build(new 
CacheLoader<byte[], SchemaReader<T>>() {
+                @Override
+                public SchemaReader<T> load(byte[] schemaVersion) {
+                    return loadReader(schemaVersion);
+                }
+            });
 
-    protected StructSchema(SchemaType schemaType,
-                           org.apache.avro.Schema schema,
-                           Map<String, String> properties) {
-        this.schema = schema;
-        this.schemaInfo = new SchemaInfo();
-        this.schemaInfo.setName("");
-        this.schemaInfo.setType(schemaType);
-        this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
-        this.schemaInfo.setProperties(properties);
+    protected StructSchema(SchemaInfo schemaInfo) {
+        this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), 
UTF_8));
+        this.schemaInfo = schemaInfo;
     }
 
-    protected org.apache.avro.Schema getAvroSchema() {
+    public org.apache.avro.Schema getAvroSchema() {
         return schema;
     }
 
+    @Override
+    public byte[] encode(T message) {
+        return writer.write(message);
+    }
+
+    @Override
+    public T decode(byte[] bytes) {
+        return reader.read(bytes);
+    }
+
+    @Override
+    public T decode(byte[] bytes, byte[] schemaVersion) {
+        try {
+            return readerCache.get(schemaVersion).read(bytes);
+        } catch (ExecutionException e) {
+            LOG.error("Can't get generic schema for topic {} schema version 
{}",
+                    schemaInfoProvider.getTopicName(), 
Hex.encodeHexString(schemaVersion), e);
+            return null;
+        }
+    }
+
     @Override
     public SchemaInfo getSchemaInfo() {
         return this.schemaInfo;
     }
 
     protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition 
schemaDefinition) {
         Class pojo = schemaDefinition.getPojo();
-        return schemaDefinition.getAlwaysAllowNull() ? 
ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
+        if (pojo != null) {
+            return schemaDefinition.getAlwaysAllowNull() ? 
ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
+        } else if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
+            return parseAvroSchema(schemaDefinition.getJsonDef());
+        } else {
+            throw new RuntimeException("Schema definition must specify pojo 
class or schema json definition");
+        }
+    }
+
+    public static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
 
 Review comment:
   nit: protected

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to