sijie commented on a change in pull request #4211: [issue#4155][pulsar-clients]Support key value schema versioning URL: https://github.com/apache/pulsar/pull/4211#discussion_r281460761
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java ########## @@ -242,18 +242,42 @@ public boolean isExpired(int messageTTLInSeconds) { @Override public T getValue() { - // check if the schema passed in from client supports schema versioning or not - // this is an optimization to only get schema version when necessary - byte [] schemaVersion = getSchemaVersion(); - if (schema.supportSchemaVersioning() && schemaVersion != null) { - return schema.decode(getData(), schemaVersion); - } else if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) { - KeyValueSchema kvSchema = (KeyValueSchema) schema; - if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { - return schema.decode(getKeyBytes(), getData()); + if (SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) { + if (schema.supportSchemaVersioning()) { + return getKeyValueBySchemaVersion(); + } else { + return getKeyValue(); + } + } else { + // check if the schema passed in from client supports schema versioning or not + // this is an optimization to only get schema version when necessary + if (schema.supportSchemaVersioning()) { + byte[] schemaVersion = getSchemaVersion(); + if (null == schemaVersion) { + return schema.decode(getData()); + } else { + return schema.decode(getData(), schemaVersion); + } } else { return schema.decode(getData()); } + } + } + + private T getKeyValueBySchemaVersion() { + KeyValueSchema kvSchema = (KeyValueSchema) schema; + byte[] schemaVersion = getSchemaVersion(); + if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { + return (T)kvSchema.decode(getKeyBytes(), getData(), schemaVersion); + } else { + return schema.decode(getData(), schemaVersion); + } + } + + private T getKeyValue() { + KeyValueSchema kvSchema = (KeyValueSchema) schema; + if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) { + return (T)kvSchema.decode(getKeyBytes(), getData(), null); Review comment: ```suggestion return (T) kvSchema.decode(getKeyBytes(), getData(), null); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services