BitoAgent commented on code in PR #13973: URL: https://github.com/apache/dubbo/pull/13973#discussion_r1538266993
########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java: ########## @@ -37,6 +37,8 @@ public class Http2ServerChannelObserver extends AbstractServerHttpChannelObserve private boolean autoRequestN = true; + private boolean closed = false; + Review Comment: **Suggestion**: Ensure thread safety for the "closed" flag access and updates, considering possible concurrent modifications. <br> **Code Suggestion**: ``` +import java.util.concurrent.atomic.AtomicBoolean; -private boolean closed = false; +private AtomicBoolean closed = new AtomicBoolean(false); ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2TransportListener.java: ########## @@ -16,4 +16,7 @@ */ package org.apache.dubbo.remoting.http12.h2; -public interface Http2TransportListener extends CancelableTransportListener<Http2Header, Http2InputMessage> {} +public interface Http2TransportListener extends CancelableTransportListener<Http2Header, Http2InputMessage> { + + void onStreamClosed(); +} Review Comment: **Suggestion**: Document the expected behavior and usage of the onStreamClosed method within the interface to guide implementers. <br> **Code Suggestion**: ``` +/** + * Invoked when a stream is closed. Implementations should perform necessary cleanup. + */ void onStreamClosed(); ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java: ########## @@ -74,12 +76,33 @@ @Override public void cancel(Throwable throwable) { + if (throwable instanceof CancelStreamException) { + if (((CancelStreamException) throwable).isCancelByRemote()) { + closed = true; + } + } + this.cancellationContext.cancel(throwable); long errorCode = 0; if (throwable instanceof ErrorCodeHolder) { errorCode = ((ErrorCodeHolder) throwable).getErrorCode(); } getHttpChannel().writeResetFrame(errorCode); - this.cancellationContext.cancel(throwable); + } + + @Override + public void onNext(Object data) { + if (closed) { + return; + } + super.onNext(data); + } + + @Override + public void onError(Throwable throwable) { + if (closed) { + return; + } + super.onError(throwable); Review Comment: **Optimization Issue**: Similar to onNext, the onError method returns early if the channel is closed, potentially missing important error handling steps. This could lead to unhandled exceptions or errors that occur after the channel is marked as closed but before it is actually closed. <br> **Fix**: Ensure that errors occurring after the channel is marked as closed but before it is actually closed are properly logged or handled to prevent unhandled exceptions. <br> **Code Suggestion**: ``` + if (closed) { + logError(throwable); // Implement this method to log or handle error + return; + } ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java: ########## @@ -187,6 +187,12 @@ protected final Http2ServerChannelObserver getServerChannelObserver() { return serverChannelObserver; } + @Override + public void onStreamClosed() { + // doing on event loop thread + getStreamingDecoder().close(); Review Comment: **Suggestion**: Validate the state of getStreamingDecoder() before invoking close() to avoid potential NullPointerException. <br> **Code Suggestion**: ``` +if (getStreamingDecoder() != null) { + getStreamingDecoder().close(); +} ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12.h2; + +import org.apache.dubbo.remoting.http12.ErrorCodeHolder; + +public class CancelStreamException extends RuntimeException implements ErrorCodeHolder { + + private final boolean cancelByRemote; + + private final long errorCode; + + private CancelStreamException(boolean cancelByRemote, long errorCode) { + this.cancelByRemote = cancelByRemote; + this.errorCode = errorCode; + } + + public boolean isCancelByRemote() { + return cancelByRemote; + } + + public static CancelStreamException fromRemote(long errorCode) { + return new CancelStreamException(true, errorCode); + } + + public static CancelStreamException fromLocal(long errorCode) { + return new CancelStreamException(false, errorCode); + } + + @Override + public long getErrorCode() { + return errorCode; + } +} Review Comment: **Suggestion**: Consider making CancelStreamException a checked exception to enforce explicit handling of stream cancellation scenarios, promoting safer code practices. <br> **Code Suggestion**: ``` -public class CancelStreamException extends RuntimeException implements ErrorCodeHolder { +public class CancelStreamException extends Exception implements ErrorCodeHolder { ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java: ########## @@ -68,8 +69,9 @@ h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue); } ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addLast( - new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel))); + Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); Review Comment: **Suggestion**: Consider verifying the success of the channel closure operation within the listener to handle potential failures. <br> **Code Suggestion**: ``` +ctx.channel().closeFuture().addListener(future -> { + if (future.isSuccess()) { + http2TransportListener.onStreamClosed(); + } else { + // Handle closure failure + } +}); ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java: ########## @@ -74,12 +76,33 @@ @Override public void cancel(Throwable throwable) { + if (throwable instanceof CancelStreamException) { + if (((CancelStreamException) throwable).isCancelByRemote()) { + closed = true; + } + } Review Comment: **Suggestion**: Refactor the condition inside the cancel method to reduce nesting and improve readability. <br> **Code Suggestion**: ``` +if (throwable instanceof CancelStreamException && ((CancelStreamException) throwable).isCancelByRemote()) { - if (((CancelStreamException) throwable).isCancelByRemote()) { - closed = true; + closed.set(true); - } ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java: ########## @@ -187,6 +187,12 @@ return serverChannelObserver; } + @Override + public void onStreamClosed() { + // doing on event loop thread + getStreamingDecoder().close(); Review Comment: **Performance Issue**: Closing the streaming decoder on stream closure without checking if all data has been processed might lead to data loss. This is particularly important for ensuring data integrity in streaming applications. <br> **Fix**: Before closing the streaming decoder, check if all expected data has been processed. Implement a mechanism to ensure data integrity, such as acknowledgments from the client that all data has been received and processed. <br> **Code Suggestion**: ``` + @Override + public void onStreamClosed() { + // Ensure all expected data has been processed before closing the streaming decoder + if (!getStreamingDecoder().isDataFullyProcessed()) { + // Implement mechanism to ensure data integrity, e.g., acknowledgments from the client + return; + } + getStreamingDecoder().close(); + } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java: ########## @@ -68,8 +69,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMetadata metadata) { h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue); } ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addLast( - new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel))); + Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); + pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); Review Comment: **Suggestion**: Add error handling for the closeFuture listener to gracefully manage potential exceptions during stream closure. <br> **Code Suggestion**: ``` +ctx.channel().closeFuture().addListener(future -> { + try { + http2TransportListener.onStreamClosed(); + } catch (Exception e) { + // Log or handle the exception + } +}); ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java: ########## @@ -74,12 +76,33 @@ @Override public void cancel(Throwable throwable) { + if (throwable instanceof CancelStreamException) { + if (((CancelStreamException) throwable).isCancelByRemote()) { + closed = true; + } + } Review Comment: **Security Issue**: The logic for setting 'closed = true' based on the type of throwable (CancelStreamException) and its source (remote) without further validation may lead to unauthorized control flow changes or denial of service. <br> **Fix**: Implement additional validation checks to ensure that the throwable instance leading to the 'closed = true' state change is from a trusted and authorized source. Consider adding authentication or integrity checks on the source of the CancelStreamException. <br> **Code Suggestion**: ``` + if (throwable instanceof CancelStreamException && ((CancelStreamException) throwable).isCancelByRemote()) { + // Implement additional validation to ensure the source is authorized + if (isSourceAuthorized(throwable)) { + closed = true; + } + } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/CancelStreamException.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12.h2; + +import org.apache.dubbo.remoting.http12.ErrorCodeHolder; + +public class CancelStreamException extends RuntimeException implements ErrorCodeHolder { + + private final boolean cancelByRemote; + + private final long errorCode; + + private CancelStreamException(boolean cancelByRemote, long errorCode) { + this.cancelByRemote = cancelByRemote; + this.errorCode = errorCode; + } + + public boolean isCancelByRemote() { + return cancelByRemote; + } + + public static CancelStreamException fromRemote(long errorCode) { + return new CancelStreamException(true, errorCode); + } + + public static CancelStreamException fromLocal(long errorCode) { + return new CancelStreamException(false, errorCode); + } + + @Override + public long getErrorCode() { + return errorCode; + } +} Review Comment: **Security Issue**: The class CancelStreamException exposes a public static factory method for creating instances that signify a cancellation by remote entities without any form of authentication or verification of the remote entity's identity, potentially allowing an attacker to forge such exceptions to disrupt the service. <br> **Fix**: Introduce a mechanism to authenticate or verify the legitimacy of the remote entity before allowing the creation of a CancelStreamException instance that signifies a remote cancellation. This could involve checking a signature, a shared secret, or a secure context. <br> **Code Suggestion**: ``` private static final Object sharedSecret = "SECRET"; public static CancelStreamException fromRemote(long errorCode, Object secret) { if (!sharedSecret.equals(secret)) { throw new SecurityException("Invalid secret for remote cancellation."); } return new CancelStreamException(true, errorCode); } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java: ########## @@ -74,12 +76,33 @@ @Override public void cancel(Throwable throwable) { + if (throwable instanceof CancelStreamException) { + if (((CancelStreamException) throwable).isCancelByRemote()) { + closed = true; Review Comment: **Performance Issue**: The condition checking and setting of the 'closed' flag based on the type of exception can lead to race conditions in a concurrent environment, potentially causing some streams to not be closed properly. <br> **Fix**: Ensure thread safety by synchronizing access to the 'closed' flag or by using atomic variables. Additionally, consider using more specific exception handling to manage stream closures more effectively. <br> **Code Suggestion**: ``` + private final AtomicBoolean closed = new AtomicBoolean(false); + if (throwable instanceof CancelStreamException && ((CancelStreamException) throwable).isCancelByRemote()) { + closed.set(true); + } ``` ########## dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java: ########## @@ -187,6 +187,12 @@ return serverChannelObserver; } + @Override + public void onStreamClosed() { + // doing on event loop thread + getStreamingDecoder().close(); + } Review Comment: **Scalability Issue**: Implementing onStreamClosed to directly close the streaming decoder without considering the state or existence of ongoing operations can lead to abrupt termination of valid processing. This approach might not scale well in scenarios where graceful shutdown or cleanup is necessary for maintaining system stability under load. <br> **Fix**: Ensure that onStreamClosed gracefully handles ongoing operations, possibly by signaling them to complete or terminate in an orderly fashion, before closing the decoder. <br> **Code Suggestion**: ``` + @Override + public void onStreamClosed() { + // Signal ongoing operations to complete or terminate in an orderly fashion + if (ongoingOperationsExist()) { + signalOngoingOperationsToCompleteOrTerminate(); + } + getStreamingDecoder().close(); + } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java: ########## @@ -74,12 +76,33 @@ @Override public void cancel(Throwable throwable) { + if (throwable instanceof CancelStreamException) { + if (((CancelStreamException) throwable).isCancelByRemote()) { + closed = true; + } + } Review Comment: **Optimization Issue**: The conditional block checks if the throwable is an instance of CancelStreamException and if it is cancelled by remote, then sets the 'closed' flag to true. However, there is no mechanism to ensure that resources associated with the channel are released or cleaned up, potentially leading to resource leaks. <br> **Fix**: Ensure that all resources associated with the channel are properly released or cleaned up when the channel is closed to prevent resource leaks. <br> **Code Suggestion**: ``` + if (closed) { + releaseAssociatedResources(); // Implement this method to clean up resources + } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java: ########## @@ -68,8 +69,9 @@ h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue); } ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addLast( - new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel))); + Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); Review Comment: **Security Issue**: Automatically closing the stream on receiving any close future event without validating the cause or source of the close request may inadvertently allow an attacker to trigger unintended closures, potentially leading to denial of service. <br> **Fix**: Validate the cause of the close future event to ensure it's legitimate and expected. Implement logic to differentiate between normal and abnormal closures, possibly inspecting the close status or reason if available. <br> **Code Suggestion**: ``` if (validateCloseEvent(closeFutureEvent)) { stream.close(); } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java: ########## @@ -68,8 +69,9 @@ h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue); } ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addLast( - new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel))); + Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); Review Comment: **Performance Issue**: Using a listener to close streams might introduce a delay in freeing up resources, potentially leading to memory leaks if the streams are not closed in a timely manner. This is critical in a high-throughput system where resources are constantly being allocated and deallocated. <br> **Fix**: Consider implementing a more proactive resource management strategy, such as explicitly closing streams in a finally block or using try-with-resources (for Java 7 and above) to ensure that resources are freed as soon as they are no longer needed. <br> **Code Suggestion**: ``` try (Stream stream = createStream()) { // use the stream } catch (Exception e) { // handle exception } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java: ########## @@ -68,8 +69,9 @@ h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue); } ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addLast( - new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel))); + Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); + pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); Review Comment: **Scalability Issue**: The addition of a closeFuture listener to handle stream closure can potentially create a scalability issue. If there are many instances of Http2TransportListener being created and not properly removed, it could lead to a memory leak as each listener holds a reference to the Http2TransportListener instance. This is particularly concerning in a high-throughput environment where connections are frequently opened and closed. <br> **Fix**: Implement a mechanism to ensure that these listeners are removed or deregistered when no longer needed, or when the associated Channel is closed. This could involve keeping track of listeners and explicitly removing them, or using weak references. <br> **Code Suggestion**: ``` ctx.channel().closeFuture().addListener(future -> { http2TransportListener.onStreamClosed(); listeners.remove(http2TransportListener); }); ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java: ########## @@ -74,12 +76,33 @@ @Override public void cancel(Throwable throwable) { + if (throwable instanceof CancelStreamException) { + if (((CancelStreamException) throwable).isCancelByRemote()) { + closed = true; + } + } + this.cancellationContext.cancel(throwable); long errorCode = 0; if (throwable instanceof ErrorCodeHolder) { errorCode = ((ErrorCodeHolder) throwable).getErrorCode(); } getHttpChannel().writeResetFrame(errorCode); - this.cancellationContext.cancel(throwable); + } + + @Override + public void onNext(Object data) { + if (closed) { + return; + } + super.onNext(data); Review Comment: **Optimization Issue**: The onNext method checks if the channel is closed and returns early if it is. This could lead to missed data processing if the close flag is set prematurely. Additionally, there is no handling for the case where data arrives after the channel is marked as closed but before it is actually closed. <br> **Fix**: Implement a more robust mechanism for handling data that arrives after the channel is marked as closed but before it is actually closed. Consider queueing such data for processing or handling it in a way that ensures no data is lost. <br> **Code Suggestion**: ``` + private final ConcurrentLinkedQueue<Object> pendingData = new ConcurrentLinkedQueue<>(); + if (closed) { + pendingData.offer(data); + return; + } + while (!pendingData.isEmpty()) { + Object nextData = pendingData.poll(); + super.onNext(nextData); + } ``` ########## dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyHttp2ProtocolSelectorHandler.java: ########## @@ -68,8 +69,9 @@ h2StreamChannel = new Http2WriteQueueChannel(h2StreamChannel, writeQueue); } ChannelPipeline pipeline = ctx.pipeline(); - pipeline.addLast( - new NettyHttp2FrameHandler(h2StreamChannel, factory.newInstance(h2StreamChannel, url, frameworkModel))); + Http2TransportListener http2TransportListener = factory.newInstance(h2StreamChannel, url, frameworkModel); + ctx.channel().closeFuture().addListener(future -> http2TransportListener.onStreamClosed()); + pipeline.addLast(new NettyHttp2FrameHandler(h2StreamChannel, http2TransportListener)); Review Comment: **Optimization Issue**: Registering a listener for every channel close event inside channelRead0 might lead to a memory leak if not properly handled. This is because every time a message is read, a new listener is added to the closeFuture of the channel, potentially creating a large number of listeners for the same event, especially under high load. <br> **Fix**: Consider maintaining a single global listener for the close event per channel, or ensure that listeners are properly removed after being triggered to avoid potential memory leaks. <br> **Code Suggestion**: ``` private static final ChannelFutureListener CLOSE_LISTENER = future -> { // cleanup resources }; // During initialization channel.closeFuture().addListener(CLOSE_LISTENER); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org