sijie commented on a change in pull request #4211: 
[issue#4155][pulsar-clients]Support key value schema versioning
URL: https://github.com/apache/pulsar/pull/4211#discussion_r281460761
 
 

 ##########
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
 ##########
 @@ -242,18 +242,42 @@ public boolean isExpired(int messageTTLInSeconds) {
 
     @Override
     public T getValue() {
-        // check if the schema passed in from client supports schema 
versioning or not
-        // this is an optimization to only get schema version when necessary
-        byte [] schemaVersion = getSchemaVersion();
-        if (schema.supportSchemaVersioning() && schemaVersion != null) {
-            return schema.decode(getData(), schemaVersion);
-        } else if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == 
schema.getSchemaInfo().getType()) {
-            KeyValueSchema kvSchema = (KeyValueSchema) schema;
-            if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
-                return schema.decode(getKeyBytes(), getData());
+        if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
+            if (schema.supportSchemaVersioning()) {
+                return getKeyValueBySchemaVersion();
+            } else {
+                return getKeyValue();
+            }
+        } else {
+            // check if the schema passed in from client supports schema 
versioning or not
+            // this is an optimization to only get schema version when 
necessary
+            if (schema.supportSchemaVersioning()) {
+                byte[] schemaVersion = getSchemaVersion();
+                if (null == schemaVersion) {
+                    return schema.decode(getData());
+                } else {
+                    return schema.decode(getData(), schemaVersion);
+                }
             } else {
                 return schema.decode(getData());
             }
+        }
+    }
+
+    private T getKeyValueBySchemaVersion() {
+        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        byte[] schemaVersion = getSchemaVersion();
+        if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
+            return (T)kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
+        } else {
+            return schema.decode(getData(), schemaVersion);
+        }
+    }
+
+    private T getKeyValue() {
+        KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
+            return (T)kvSchema.decode(getKeyBytes(), getData(), null);
 
 Review comment:
   ```suggestion
               return (T) kvSchema.decode(getKeyBytes(), getData(), null);
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to