ferrirW commented on code in PR #63:
URL:
https://github.com/apache/rocketmq-schema-registry/pull/63#discussion_r978248261
##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroSerializer.java:
##########
@@ -62,10 +65,19 @@ public byte[] serialize(
if (record == null) {
return null;
}
+ String purposeSchema;
+ if (record instanceof GenericRecord) {
+ purposeSchema = ((GenericContainer) record).getSchema().toString();
+ } else {
+ purposeSchema =
SpecificData.get().getSchema(record.getClass()).toString();
+ }
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out,
null);
- GetSchemaResponse response = getSchemaBySubject(subject);
+ GetSchemaResponse response =
schemaRegistry.getTargetSchema(subject, purposeSchema);
+ if (response == null) {
Review Comment:
同上
##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java:
##########
@@ -69,38 +72,58 @@ public T deserialize(String subject, byte[] payload)
}
try {
- GetSchemaResponse response =
schemaRegistry.getSchemaBySubject(subject);
- Schema schema = new Schema.Parser().parse(response.getIdl());
- return avroDecode(payload, schema);
- } catch (RestClientException | IOException e) {
- throw new RuntimeException(e);
+ ByteArrayInputStream bais = new ByteArrayInputStream(payload);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais,
null);
+
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ try {
+ decoder.readBytes(buffer);
+ } catch (Exception e) {
+ log.error("read bytes error: ", e);
+ }
+ long schemaRecordId = buffer.getLong();
+ GetSchemaResponse response =
schemaRegistry.getSchemaByRecordId(subject, schemaRecordId);
+ if (response == null) {
+ throw new SerializationException("there's no version schema
from service can deserialize this object");
+ }
+ Schema writerSchema = new Schema.Parser().parse(response.getIdl());
+ if (readerSchema == null) {
+ readerSchema = getReaderSchema(writerSchema);
+ }
+
+ DatumReader<T> datumReader = getDatumReader(writerSchema,
readerSchema);
+ return datumReader.read(null, decoder);
+ } catch (RestClientException e) {
+ throw new SerializationException("get schema by record id failed,
maybe the schema storage service not available now", e);
+ } catch (IOException e) {
Review Comment:
内部的 try-catch 和外部的合并一下
##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java:
##########
@@ -69,38 +72,58 @@ public T deserialize(String subject, byte[] payload)
}
try {
- GetSchemaResponse response =
schemaRegistry.getSchemaBySubject(subject);
- Schema schema = new Schema.Parser().parse(response.getIdl());
- return avroDecode(payload, schema);
- } catch (RestClientException | IOException e) {
- throw new RuntimeException(e);
+ ByteArrayInputStream bais = new ByteArrayInputStream(payload);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais,
null);
+
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ try {
+ decoder.readBytes(buffer);
+ } catch (Exception e) {
+ log.error("read bytes error: ", e);
+ }
+ long schemaRecordId = buffer.getLong();
+ GetSchemaResponse response =
schemaRegistry.getSchemaByRecordId(subject, schemaRecordId);
+ if (response == null) {
Review Comment:
这里不会返回 null 吧?
##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java:
##########
@@ -69,38 +72,58 @@ public T deserialize(String subject, byte[] payload)
}
try {
- GetSchemaResponse response =
schemaRegistry.getSchemaBySubject(subject);
- Schema schema = new Schema.Parser().parse(response.getIdl());
- return avroDecode(payload, schema);
- } catch (RestClientException | IOException e) {
- throw new RuntimeException(e);
+ ByteArrayInputStream bais = new ByteArrayInputStream(payload);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bais,
null);
+
+ ByteBuffer buffer = ByteBuffer.allocate(16);
+ try {
+ decoder.readBytes(buffer);
+ } catch (Exception e) {
+ log.error("read bytes error: ", e);
Review Comment:
异常需要做处理
##########
client/src/main/java/org/apache/rocketmq/schema/registry/client/serde/avro/AvroDeserializer.java:
##########
@@ -59,7 +58,11 @@ public void configure(Map<String, Object> configs) {
}
@Override
- public T deserialize(String subject, byte[] payload)
+ public T deserialize(String subject, byte[] payload) {
+ return this.deserialize(subject, payload, null);
+ }
Review Comment:
不太理解 Streams 场景为何不能要求数据一定带着 recordId,本质上增加 schema
就是通过限制来保证链路上的类型安全,如果为了灵活性去掉这个限制的话,感觉和旧版本没有太大的区别了
但是从实现角度上增加一种不依赖 Schema server 的分支,我觉得也没问题 @humkum 可以考虑先加上
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]