-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review89677
-----------------------------------------------------------


Thanks for the patch. A few comments below.

Also, we need to handle zero-copy transfer in PartitionDataSend properly. 
Currently, the following code won't work over SSL.

      val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, 
messageSize - messagesSentSize)


clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
(lines 64 - 66)
<https://reviews.apache.org/r/33620/#comment142775>

    Would it be better to just add pending to the TransportLayer interface so 
that we don't have to special case for SSL? Also, would it be better to name 
this sth like hasPendingWrites()?



clients/src/main/java/org/apache/kafka/common/network/Channel.java (line 116)
<https://reviews.apache.org/r/33620/#comment142349>

    Let's add a comment on what the expected return value is. It seems that 
when the full response is received, we will return a NetworkReceive. Otherwise, 
we will return null.
    
    We probably want to add a comment on each public method whose meaning is 
not obvious.



clients/src/main/java/org/apache/kafka/common/network/Channel.java (lines 148 - 
150)
<https://reviews.apache.org/r/33620/#comment142350>

    Our current coding convention is not to wrap single line statement with {}. 
Could you make a pass on this in other places?



clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java 
(lines 48 - 49)
<https://reviews.apache.org/r/33620/#comment142344>

    Do we need to use principal builder here? Should we just return an 
anonymous principal?



clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
(lines 90 - 92)
<https://reviews.apache.org/r/33620/#comment142348>

    Our current coding convention is not to wrap single line statement with {}.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 44)
<https://reviews.apache.org/r/33620/#comment142651>

    Could we annotate all methods in the TransportLayer interface with 
@Override?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 58)
<https://reviews.apache.org/r/33620/#comment142345>

    Could we document this field? In particular, how is that related to the 
interest ops in the selection key? When do we expect them to be the same and 
different?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 67)
<https://reviews.apache.org/r/33620/#comment142347>

    Shouldn't we only start the ssl handshake after the raw socket connection 
is established? SSLTransportLayer is instantiated before the connection is 
established.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 162 - 165)
<https://reviews.apache.org/r/33620/#comment142616>

    It seems that we expect if there is sth to write, the socket channel should 
always be writtable. Should we add an assertion on this to make sure that we 
are in the right state?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 169 - 172)
<https://reviews.apache.org/r/33620/#comment142638>

    Could you document the typical state transition sequence during handshake()?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 178)
<https://reviews.apache.org/r/33620/#comment142629>

    When calling flush(), sometimes we check that the socket is writable, but 
some other times, we don't do the check. It would be clearer if we do this 
consistently.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 179)
<https://reviews.apache.org/r/33620/#comment142615>

    Is this needed? It seems that the assumption is that if there is sth to 
write, SelectionKey.OP_WRITE should already be set and there is no need to set 
it again.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 203)
<https://reviews.apache.org/r/33620/#comment142365>

    Hmm, if write is false, we shouldn't write to the socket through flush, 
right?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 217 - 218)
<https://reviews.apache.org/r/33620/#comment142367>

    These need to use appReadBuffer.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 272 - 275)
<https://reviews.apache.org/r/33620/#comment142642>

    According to the comment, shouldn't we check for NOT_HANDSHAKING too?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 306)
<https://reviews.apache.org/r/33620/#comment142622>

    Does it matter whether to complete the writes to the socket before running 
the task or not?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 327)
<https://reviews.apache.org/r/33620/#comment142368>

    It's probably better to throw EOFException.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 331 - 343)
<https://reviews.apache.org/r/33620/#comment142634>

    What if we don't read anything in netReadBuffer, do we get stuck in the 
while loop?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 464)
<https://reviews.apache.org/r/33620/#comment142652>

    Can write() be called if handshakeCompelete is false?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 470)
<https://reviews.apache.org/r/33620/#comment142653>

    Could we rename wrap to sth like wrapResult?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 473 - 477)
<https://reviews.apache.org/r/33620/#comment142654>

    Can renegotiation happen in the middle of a write()? What happens to the 
byte that we haven't finished writing?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 562)
<https://reviews.apache.org/r/33620/#comment142680>

    Typo SelecitonKey



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 576)
<https://reviews.apache.org/r/33620/#comment142681>

    Typo SelecitonKey



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 593)
<https://reviews.apache.org/r/33620/#comment142363>

    Why does this need to be protected? SSLTransportLayer is not being 
subclassed.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 602)
<https://reviews.apache.org/r/33620/#comment142677>

    Not sure if we should do the flip here. The caller may not be able to read 
all bytes in appReadBuffer once. When the caller calls readFromAppBuffer() 
again, we shouldn't do the flip again.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 143 
- 146)
<https://reviews.apache.org/r/33620/#comment142343>

    The original code seems better by getting the socket once and reusing it 
for all subsequent calls.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 271 
- 274)
<https://reviews.apache.org/r/33620/#comment142346>

    We probably don't want to call prepare if the connection has been 
established. How do we enforce that?



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 302)
<https://reviews.apache.org/r/33620/#comment142353>

    We can use channel directly instead of calling channel(key).



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 311)
<https://reviews.apache.org/r/33620/#comment142352>

    We can use channel directly instead of calling channel(key).



clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java (line 
34)
<https://reviews.apache.org/r/33620/#comment142351>

    typo authenticaiton



clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
(lines 169 - 212)
<https://reviews.apache.org/r/33620/#comment142772>

    Is the commented out code still needed?



core/src/main/scala/kafka/network/SocketServer.scala (lines 119 - 120)
<https://reviews.apache.org/r/33620/#comment142771>

    We may have a memory visibility issue here (which seems to exist w/o the 
patch). Since there is no memory barrier, the request threads may not see the 
latest value in the processors array. One way to fix this is to populate all 
processors during the instantiating SocketServer.



core/src/main/scala/kafka/network/SocketServer.scala (lines 234 - 239)
<https://reviews.apache.org/r/33620/#comment142769>

    This is a metric across all processors. So we need to create it outside of 
Acceptor.



core/src/main/scala/kafka/network/SocketServer.scala (line 358)
<https://reviews.apache.org/r/33620/#comment142770>

    This parameter doesn't seem to be used. We can remove it.



core/src/main/scala/kafka/network/SocketServer.scala (line 501)
<https://reviews.apache.org/r/33620/#comment142683>

    Do we need KafkaConfig.channelConfigs? It seems that we can just pass in 
KafkaConfig.original() in configure().



core/src/main/scala/kafka/server/KafkaServer.scala 
<https://reviews.apache.org/r/33620/#comment142773>

    We need to instantiate Metrics here and pass along it to SocketServer. This 
is because there will be other components of the code (e.g., KafkaApis) that 
will be using the shared Metrics instance in the future.



core/src/test/scala/unit/kafka/network/SocketServerTest.scala (line 208)
<https://reviews.apache.org/r/33620/#comment142774>

    Do we need to add the logic to read the response and check that it's 
expected?


- Jun Rao


On June 23, 2015, 8:18 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> -----------------------------------------------------------
> 
> (Updated June 23, 2015, 8:18 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
>     https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Diffs
> -----
> 
>   build.gradle 30d1cf2f1ff9ed3f86a060da8099bb0774b4cf91 
>   checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> daff34db5bf2144e9dc274b23dc56b88f4efafdc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 951c34c92710fc4b38d656e99d2a41255c60aeb7 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 5a37580ec69af08b97cf5b43b241790ba8c129dd 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
> bae528d31516679bed88ee61b408f209f185a8cc 
>   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
>   clients/src/main/java/org/apache/kafka/common/network/Channel.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> 618a0fa53848ae6befea7eba39c2f3285b734494 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 4aee214b24fd990be003adc36d675f015bf22fe6 
>   clients/src/main/java/org/apache/kafka/common/network/Send.java 
> 8f6daadf6b67c3414911cda77765512131e56fd3 
>   clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
> dab1a94dd29563688b6ecf4eeb0e180b06049d3f 
>   
> clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> f73eedb030987f018d8446bb1dcd98d19fa97331 
>   clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java 
> 13ce519f03d13db041e1f2dbcd6b59414d2775b8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
>  f3f8334f848be4cc043d5a573975609a3681fe7e 
>   clients/src/test/java/org/apache/kafka/common/network/EchoServer.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/network/SSLFactoryTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 158f9829ff64a969008f699e40c51e918287859e 
>   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
> 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 
>   clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 91319fa010b140cca632e5fa8050509bd2295fc9 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> c1f0ccad4900d74e41936fae4c6c2235fe9314fe 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 52dc728bb1ab4b05e94dc528da1006040e2f28c9 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 9ce4bd5ee130ce3cb252b2883a3fd3c9acd742a5 
>   core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
> df5c6ba20f01e497ce896af790cbab40369f1776 
>   core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
> e4bf2df48dd59a251b646b7f96d63ec4b924fc0b 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 7dc2fad542ea553ee888543dd215eb41ea57d509 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> 98a5b042a710d3c1064b0379db1d152efc9eabee 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 17e9fe4c159a29033fe9a287db6ced2fdc3fa9d1 
> 
> Diff: https://reviews.apache.org/r/33620/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>

Reply via email to