Github user vijikarthi commented on the issue:

    https://github.com/apache/flink/pull/2425
  
    >
    The cookie is added to every single message/buffer that is transferred. 
That is too much - securing the integrity of the stream is responsibility of 
the encryption layer. The cookie should be added to requests messages that 
establish connections only.
    
    I will change the code to address cookie handling right after the SSL 
handshake using a new handler and drop the cookie passing for every messages. 
The handler will be added to the pipeline of both `NettyClient` and 
`NettyServer`. Client will send the cookie when the channel becomes active and 
the server will validate and keep track of the clients that are authorized. 
    
    Here is the pseudo-code for Client and Server handlers. Please take a look 
and let me know if you are okay with this approach and I will modify the code.
    
    ---
        public static class ClientCookieHandler extends 
ChannelInboundHandlerAdapter {
    
                private final String secureCookie;
    
                final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
    
                public ClientCookieHandler(String secureCookie) {
                        this.secureCookie = secureCookie;
                }
    
    
                @Override
                public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
                        super.channelActive(ctx);
    
                        if(this.secureCookie != null && 
this.secureCookie.length() != 0) {
                                final ByteBuf buffer = Unpooled.buffer(4 + 
this.secureCookie.getBytes(DEFAULT_CHARSET).length);
                                
buffer.writeInt(secureCookie.getBytes(DEFAULT_CHARSET).length);
                                
buffer.writeBytes(secureCookie.getBytes(DEFAULT_CHARSET));
                                ctx.writeAndFlush(buffer);
                        }
                }
        }
    
        public static class ServerCookieDecoder extends 
MessageToMessageDecoder<ByteBuf> {
    
                private final String secureCookie;
    
                private final List<Channel> channelList = new ArrayList<>();
    
                private final Charset DEFAULT_CHARSET = 
Charset.forName("utf-8");
    
               public ServerCookieDecoder(String secureCookie) {
                        this.secureCookie = secureCookie;
                }
    
                @Override
                protected void decode(ChannelHandlerContext ctx, ByteBuf msg, 
List<Object> out) throws Exception {
    
                        if(secureCookie == null || secureCookie.length() == 0) {
                                return;
                        }
    
                        if(channelList.contains(ctx.channel())) {
                                return;
                        }
    
                        //read cookie based on the cookie length passed
                        int cookieLength = msg.readInt();
                        if(cookieLength != 
secureCookie.getBytes(DEFAULT_CHARSET).length) {
                                String message = "Cookie length does not match 
with source cookie. Invalid secure cookie passed.";
                                throw new IllegalStateException(message);
                        }
    
                        //read only if cookie length is greater than zero
                        if(cookieLength > 0) {
    
                                final byte[] buffer = new 
byte[secureCookie.getBytes(DEFAULT_CHARSET).length];
                                msg.readBytes(buffer, 0, cookieLength);
    
                                
if(!Arrays.equals(secureCookie.getBytes(DEFAULT_CHARSET), buffer)) {
                                        LOG.error("Secure cookie from the 
client is not matching with the server's identity");
                                        throw new 
IllegalStateException("Invalid secure cookie passed.");
                                }
    
                                LOG.info("Secure cookie validation passed");
    
                                channelList.add(ctx.channel());
                        }
    
                }
        }
    --- 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to