[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-13 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r324246381
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
 ##
 @@ -68,6 +77,22 @@ public ByteBuffer decode(byte[] data) {
 }
 }
 
+@Override
+public ByteBuffer decode(ByteBuf byteBuf) {
+if (null == byteBuf) {
+return null;
+} else {
+int size = byteBuf.readableBytes();
 
 Review comment:
   @congbobo184 I think this change still returns the underlying buffer. we 
need to copy the bytes from the ByteBuf.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-13 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r324245684
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
 ##
 @@ -93,6 +102,22 @@ public String decode(byte[] bytes) {
 }
 }
 
+public String decode(ByteBuf byteBuf) {
+if (null == byteBuf) {
+return null;
+} else {
+int size = byteBuf.readableBytes();
+byte[] bytes = tmpBuffer.get();
+if (size > bytes.length) {
+bytes = new byte[size * 2];
+tmpBuffer.set(bytes);
+}
+byteBuf.readBytes(bytes, 0, size);
+
+return new String(bytes, 0, size, charset);
 
 Review comment:
   @congbobo184 you are right. it is my mistake.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-09 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r322407811
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
 ##
 @@ -48,9 +48,9 @@ public GenericJsonReader(byte[] schemaVersion, List 
fields){
 this.schemaVersion = schemaVersion;
 }
 @Override
-public GenericJsonRecord read(byte[] bytes) {
+public GenericJsonRecord read(byte[] bytes, int offset, int length) {
 try {
-JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
+JsonNode jn = objectMapper.readTree(new String(bytes, 0, length, 
UTF_8));
 
 Review comment:
   ```suggestion
   JsonNode jn = objectMapper.readTree(new String(bytes, offset, 
length, UTF_8));
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-09 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r322405968
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
 ##
 @@ -93,6 +102,22 @@ public String decode(byte[] bytes) {
 }
 }
 
+public String decode(ByteBuf byteBuf) {
+if (null == byteBuf) {
+return null;
+} else {
+int size = byteBuf.readableBytes();
+byte[] bytes = tmpBuffer.get();
+if (size > bytes.length) {
+bytes = new byte[size * 2];
+tmpBuffer.set(bytes);
+}
+byteBuf.readBytes(bytes, 0, size);
+
+return new String(bytes, 0, size, charset);
 
 Review comment:
   I think we should create a new buffer for the `String`. because the pass-in 
byte[] will be referenced by the `String` object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-09 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r322404253
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
 ##
 @@ -68,6 +77,22 @@ public ByteBuffer decode(byte[] data) {
 }
 }
 
+@Override
+public ByteBuffer decode(ByteBuf byteBuf) {
+if (null == byteBuf) {
+return null;
+} else {
+int size = byteBuf.readableBytes();
 
 Review comment:
   since we are returning a `ByteBuffer` back to the user, we shouldn't use a 
shared byte[]. You have to copy the bytes from ByteBuf and create ByteBuffer 
from the copied bytes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-09 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r322405151
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
 ##
 @@ -45,6 +45,13 @@ public void validate(byte[] message) {
 }
 }
 
+@Override
+public void validate(ByteBuf message) {
+if (message.readableBytes()!= 4) {
 
 Review comment:
   ```suggestion
   if (message.readableBytes() != 4) {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-09 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r322407669
 
 

 ##
 File path: 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
 ##
 @@ -26,5 +26,17 @@
  * @param bytes the data
  * @return the serialized object
  */
-T read(byte[] bytes);
+default T read(byte[] bytes) {
+return read(bytes, 0, bytes.length);
+}
+
+/**
+ * serialize bytes convert pojo
+ *
+ * @param bytes the data
+ * @param offset the byte[] initial position
+ * @param length the byte[] read length
+ * @return the serialized object
+ */
+T read(byte[] bytes, int offset, int length);
 
 Review comment:
   Also add a method from read from `InputStream`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-09 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r322406286
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
 ##
 @@ -99,6 +108,37 @@ public T decode(byte[] bytes, byte[] schemaVersion) {
 }
 }
 
+@Override
+public T decode(ByteBuf byteBuf) {
+int size = getCanReadSize(byteBuf);
+return reader.read(tmpBuffer.get(), 0, size);
+}
+
+@Override
+public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
+int size = getCanReadSize(byteBuf);
+
+try {
+return 
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(tmpBuffer.get(), 0, 
size );
+} catch (ExecutionException e) {
+LOG.error("Can't get generic schema for topic {} schema version 
{}",
+schemaInfoProvider.getTopicName(), 
Hex.encodeHexString(schemaVersion), e);
+throw new RuntimeException("Can't get generic schema for topic " + 
schemaInfoProvider.getTopicName());
+}
+}
+
+private int getCanReadSize(ByteBuf byteBuf) {
 
 Review comment:
   enlargeReadBufferSize


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [pulsar] sijie commented on a change in pull request #5123: Modify the schema decode method can decode ByteBuf

2019-09-09 Thread GitBox
sijie commented on a change in pull request #5123: Modify the schema decode 
method can decode ByteBuf
URL: https://github.com/apache/pulsar/pull/5123#discussion_r322407074
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
 ##
 @@ -99,6 +108,37 @@ public T decode(byte[] bytes, byte[] schemaVersion) {
 }
 }
 
+@Override
+public T decode(ByteBuf byteBuf) {
+int size = getCanReadSize(byteBuf);
+return reader.read(tmpBuffer.get(), 0, size);
 
 Review comment:
   I think we should avoid copying the bytes from `ByteBuf` to the tmpBuffer. I 
believe AVRO provides mechanism to read from an input stream. You can convert 
the ByteBuf into an input stream.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services