-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review93862
-----------------------------------------------------------


Thanks for the patch. A few more comments below.


build.gradle (lines 247 - 249)
<https://reviews.apache.org/r/33620/#comment148282>

    As Ismael mentioned, we got rid of scala 2.9. So this is not needed.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 151 - 152)
<https://reviews.apache.org/r/33620/#comment148279>

    We probably need to try/catch IOException as we do above?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 234 - 239)
<https://reviews.apache.org/r/33620/#comment148265>

    If handshakeStatus is NEED_UNWRAP and write is true, we will fall through 
to the next case. However, there may still be unflushed data. flush() won't be 
called when write is true. Perhaps the check for write is unnecessary since (a) 
flush() always handles the case when write is false; (b) since we may have done 
a flush in line 220 and the writable status could have changed after that, 
which makes the value in write stale.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 262 - 269)
<https://reviews.apache.org/r/33620/#comment148268>

    Could we transition from NEED_WRAP to NOT_HANDSHAKING directly? Or 
NOT_HANDSHAKING can only be transitioned from FINIHED state?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 306 - 320)
<https://reviews.apache.org/r/33620/#comment148281>

    It seems that the logic here can be simpler. In handshake(), we call flush 
at the beginning. So, it seems that when handshakeFinished(), it should always 
be the case that there are no remaining bytes in netWriteBuffer. So, in 
handshakeFinished(), it seems that we can just simply set handshakeComplete to 
true and turn off OP_WRITE. Also, not sure if we need to check 
handshakeResult.getHandshakeStatus().



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 357 - 360)
<https://reviews.apache.org/r/33620/#comment148267>

    Is this correct? After netReadBuffer.compact in line 375, limit is set to 
capacity and position is set to first unread byte. The only case when they can 
be equal is that we get a full capacity worth of bytes and don't read any byte 
during unwrap. In this case, we shouldn't empty the buffer.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 368 - 381)
<https://reviews.apache.org/r/33620/#comment148266>

    If handshake status is BUFFER_OVERFLOW, we will return to the caller and 
then to the selector. However, we may have read all incoming bytes into 
netReadBuffer. So, the key may never be selected again to complete the 
handshake. It seems that this case can never happen during handshake since we 
don't expect to use the appReadBuffer. Perhaps we can just assert that state is 
illegal when handling NEED_UNWRAP in handshake().



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 409)
<https://reviews.apache.org/r/33620/#comment148274>

    Agreed with Dong: Maybe change to "if (netread <= 0) return netread"?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 417)
<https://reviews.apache.org/r/33620/#comment148276>

    It's still not very clear to me how renegotiation can be supported in the 
middle of sends/receives. Suppose that the server initiates a handshake. This 
may involve the server sending some handshake bytes to the client. After this 
point, the server expects to read handshake bytes from the client. However, the 
client may still be sending some regular bytes over the socket.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 430 - 433)
<https://reviews.apache.org/r/33620/#comment148275>

    Is this needed? If we need to expand appReadBuffer, netReadBuffer's 
position won't be 0 and we can just loop back.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 75)
<https://reviews.apache.org/r/33620/#comment148269>

    Could we add a comment on why we need to maintain this map?



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 247 
- 248)
<https://reviews.apache.org/r/33620/#comment148272>

    It seems that we will need to further check whether those channels in 
stagedReceives are muted or not.  Timeout should only be 0 if there is at least 
one unmuted channel in stagedReceives.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 281 
- 287)
<https://reviews.apache.org/r/33620/#comment148273>

    To avoid having to buffer unbounded number of receives in staged receives, 
perhaps we can choose not to read from the channel if there exist staged 
receives for a channel. This will help protect the server from running out of 
memory due to a bursty producer client.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 320 
- 323)
<https://reviews.apache.org/r/33620/#comment148270>

    There seems to be a problem with this. When there are ready keys, but those 
ready keys don't include keys with stagedReceives, we still want to return one 
receive for those keys that are not ready, but with stagedReceives. So, it 
seems that the logic will be (1) process all ready keys as before, but just add 
the received to stagedReceivesl; (2) for every key in stagedReceives, add the 
first receive to completedReceives if the key is not muted.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (line 502)
<https://reviews.apache.org/r/33620/#comment148271>

    Not sure why we need to check hasSend. It's possible for a channel to have 
both sends and receives at the same time since the NetworkClient supports 
pipelining.



core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala (line 202)
<https://reviews.apache.org/r/33620/#comment148355>

    I am not sure that we need this test. The callback logic is in the consumer 
and is independent of the socket.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (lines 113 
- 122)
<https://reviews.apache.org/r/33620/#comment148348>

    It doesn't seem that we need to test this here since this is not SSL 
specific.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 179)
<https://reviews.apache.org/r/33620/#comment148349>

    This test seems unnecessary since the partitioning logic is SSL independent.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 233)
<https://reviews.apache.org/r/33620/#comment148350>

    This test seems unnecessary. Auto topic creation logic is SSL independent.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 257)
<https://reviews.apache.org/r/33620/#comment148351>

    Not sure if this test is useful either since the exercised logic is in the 
producer, not in the socket layer.



core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala (line 301)
<https://reviews.apache.org/r/33620/#comment148354>

    Not sure if this test is useful either since the exercised logic is in the 
producer, not in the socket layer.



core/src/test/scala/unit/kafka/network/SocketServerTest.scala (line 194)
<https://reviews.apache.org/r/33620/#comment148356>

    Is this test needed given the tests we have on EchoServer?
    
    Also, do we have a test where the broker listens to multiple ports? This 
can be added in a followup patch though.


- Jun Rao


On July 25, 2015, 7:11 p.m., Sriharsha Chintalapani wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> -----------------------------------------------------------
> 
> (Updated July 25, 2015, 7:11 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
>     https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> KAFKA-1690. added staged receives to selector.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Diffs
> -----
> 
>   build.gradle 0abec26fb2d7be62c8a673f9ec838e926e64b2d1 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 70377ae2fa46deb381139d28590ce6d4115e1adc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
> bae528d31516679bed88ee61b408f209f185a8cc 
>   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
>   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> 618a0fa53848ae6befea7eba39c2f3285b734494 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> aaf60c98c2c0f4513a8d65ee0db67953a529d598 
>   clients/src/main/java/org/apache/kafka/common/network/Send.java 
> 8f6daadf6b67c3414911cda77765512131e56fd3 
>   clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
> dab1a94dd29563688b6ecf4eeb0e180b06049d3f 
>   
> clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> af9993cf9b3991f1e61e1201c94e19bc1bf76a68 
>   clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java 
> 13ce519f03d13db041e1f2dbcd6b59414d2775b8 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
>  f3f8334f848be4cc043d5a573975609a3681fe7e 
>   clients/src/test/java/org/apache/kafka/common/network/EchoServer.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/network/SSLFactoryTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 158f9829ff64a969008f699e40c51e918287859e 
>   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
> e7951d835472e5defe49be435f2c93685ba544d5 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> 51eb9d142f566c94a87add68b8c4f78b56d6ec3e 
>   clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java PRE-CREATION 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> dbe784b63817fd94e1593136926db17fac6fa3d7 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> dbe170f87331f43e2dc30165080d2cb7dfe5fdbf 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 18917bc4464b9403b16d85d20c3fd4c24893d1d3 
>   core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 
> d8eee52fc750e23c06c1f06f03b96980d9865a32 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 9ce4bd5ee130ce3cb252b2883a3fd3c9acd742a5 
>   core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala 
> PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
> 8b14bcfe7af601fe4b0fb0a7c0c544e87403062a 
>   core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
> e4bf2df48dd59a251b646b7f96d63ec4b924fc0b 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 7dc2fad542ea553ee888543dd215eb41ea57d509 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> 04a02e08a54139ee1a298c5354731bae009efef3 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> eb169d8b33c27d598cc24e5a2e5f78b789fa38d3 
> 
> Diff: https://reviews.apache.org/r/33620/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>

Reply via email to