[ 
https://issues.apache.org/jira/browse/KAFKA-200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13170544#comment-13170544
 ] 

John Fung commented on KAFKA-200:
---------------------------------

===============
Background Informations
===============

1. Socket.setReceiveBufferSize() has restriction specified in Java API doc : 
"For client sockets, setReceiveBufferSize() must be called before connecting 
the socket to its remote peer."
- 
http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29

2. Socket.setSendBufferSize() does not have corresponding restriction as 
setReceiveBufferSize()
- 
http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setSendBufferSize%28int%29

3. SocketChannel.socket() returns Socket object which is needed to provide both 
setReceiveBufferSize() and setSendBufferSize() methods (However, SocketChannel 
is available only after the connection is accepted)

4. ServerSocketChannel.socket() returns ServerSocket object which provides only 
setReceiveBufferSize() but *no* setSendBufferSize() method (ServerSocketChannel 
is available *before* the connection is accepted)

5. Hence, ServerSocketChannel object is available to provide 
setReceiveBufferSize before calling ServerSocketChannel.accept() which would 
satisfy the restriction specified in item 1

The above 5 items are background informations which lead to the following 3 
different approaches to set send/receive buffer sizes in Kafka (tested in 
Linux).

It appears that Approach 3 is the most appropriate in this scenario.

============================
Approach 1 - KAFKA-200.patch (existing patch)

1. call setReceiveBufferSize after accept()
2. call setSendBufferSize after accept()
============================

def accept(key: SelectionKey, processor: Processor) {
    val socketChannel = key.channel().asInstanceOf[ServerSocketChannel].accept()
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)
    socketChannel.socket().setReceiveBufferSize(receiveBufferSize)
     
    processor.accept(socketChannel)
}

=====
Results:
=====
1. In Linux machine, seeing both send / receive buffer socket sizes set to the 
expected values.
2. It doesn't comply with the restriction specified by Java API doc

============================
Approach 2:

1. call setReceiveBufferSize before accept()
2. call setSendBufferSize before accept()
============================

def accept(key: SelectionKey, processor: Processor) {
      val socketChannel = key.channel().asInstanceOf[SocketChannel]
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)
      socketChannel.socket().setReceiveBufferSize(receiveBufferSize)
      socketChannel.asInstanceOf[ServerSocketChannel].accept()
     
      processor.accept(socketChannel)
}

=====
Results:
=====
1. Compilation OK.
2. Runtime error for casting:

[2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor)
java.lang.ClassCastException
[2011-12-15 21:31:52,986] ERROR Error in acceptor (kafka.network.Acceptor)
java.lang.ClassCastException
. . .


============================
Approach 3:

1. call setReceiveBufferSize before accept()
    (complying with restriction specified in 
http://docs.oracle.com/javase/6/docs/api/java/net/Socket.html#setReceiveBufferSize%28int%29)

2. call setSendBufferSize after accept()
    (No corresponding restriction to call before accept() )
============================

def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    serverSocketChannel.configureBlocking(false)
    serverSocketChannel.socket().setReceiveBufferSize(receiveBufferSize)
   
    val socketChannel = serverSocketChannel.accept()
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setSendBufferSize(sendBufferSize)

    processor.accept(socketChannel)
}

=====
Results:
=====
1. Comply with the restriction specified in Java API doc
2. Both sendBufferSize and receiveBufferSize are set to the expected values:
    [2011-12-15 21:46:46,525] DEBUG sendBufferSize: [1048576] 
receiveBufferSize: [1048576] (kafka.network.Acceptor)
                
> Support configurable send / receive socket buffer size in server
> ----------------------------------------------------------------
>
>                 Key: KAFKA-200
>                 URL: https://issues.apache.org/jira/browse/KAFKA-200
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 0.7
>            Reporter: John Fung
>             Fix For: 0.8
>
>         Attachments: KAFKA-200.patch
>
>
> * Make the send / receive socket buffer size configurable in server.
> * KafkaConfig.scala already has the following existing variables to support 
> send / receive buffer:
>     socketSendBuffer
>     socketReceiveBuffer
> * The patch attached to this ticket will read the following existing settings 
> in <kafka>/config/server.properties and set the corresponding socket buffers
>     . . .
>     # The send buffer (SO_SNDBUF) used by the socket server
>     socket.send.buffer=1048576
>     # The receive buffer (SO_RCVBUF) used by the socket server
>     socket.receive.buffer=1048576

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to