szetszwo commented on code in PR #971:
URL: https://github.com/apache/ratis/pull/971#discussion_r1406993473
##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ZeroCopyMessageMarshaller.java:
##########
@@ -74,55 +95,116 @@ public InputStream stream(T value) {
@Override
public T parse(InputStream stream) {
+ final T message;
try {
- if (stream instanceof KnownLength
- && stream instanceof Detachable
- && stream instanceof HasByteBuffer
- && ((HasByteBuffer) stream).byteBufferSupported()) {
- int size = stream.available();
- // Stream is now detached here and should be closed later.
- InputStream detachedStream = ((Detachable) stream).detach();
- try {
- // This mark call is to keep buffer while traversing buffers using
skip.
- detachedStream.mark(size);
- List<ByteString> byteStrings = new LinkedList<>();
- while (detachedStream.available() != 0) {
- ByteBuffer buffer = ((HasByteBuffer)
detachedStream).getByteBuffer();
- byteStrings.add(UnsafeByteOperations.unsafeWrap(buffer));
- detachedStream.skip(buffer.remaining());
- }
- detachedStream.reset();
- CodedInputStream codedInputStream =
ByteString.copyFrom(byteStrings).newCodedInput();
- codedInputStream.enableAliasing(true);
- codedInputStream.setSizeLimit(Integer.MAX_VALUE);
- // fast path (no memory copy)
- T message;
- try {
- message = parseFrom(codedInputStream);
- } catch (InvalidProtocolBufferException ipbe) {
- throw Status.INTERNAL
- .withDescription("Invalid protobuf byte sequence")
- .withCause(ipbe)
- .asRuntimeException();
- }
- unclosedStreams.put(message, detachedStream);
- detachedStream = null;
- return message;
- } finally {
- if (detachedStream != null) {
- detachedStream.close();
- }
- }
- }
+ // fast path (no memory copy)
+ message = parseZeroCopy(stream);
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw Status.INTERNAL
+ .withDescription("Failed to parseZeroCopy")
+ .withCause(e)
+ .asRuntimeException();
}
+ if (message != null) {
+ zeroCopyCount.accept(message);
+ return message;
+ }
+
// slow path
- return marshaller.parse(stream);
+ final T copied = marshaller.parse(stream);
+ nonZeroCopyCount.accept(copied);
+ return copied;
+ }
+
+ /** Release the underlying buffers in the given message. */
+ public void release(T message) {
Review Comment:
@duongkame , The proto message was parsed from the detached stream with zero
copy. How could we gc the proto message before releasing the detached stream?
They are referring to the same memory. No?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]