kafka git commit: KAFKA-6258; SSLTransportLayer should keep reading from socket until either the buffer is full or the socket has no more data

2017-12-18 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/1.0 a64a69327 -> 45a7bb02f


KAFKA-6258; SSLTransportLayer should keep reading from socket until either the 
buffer is full or the socket has no more data

When consumer uses plaintext and there is remaining data in consumer's buffer, 
consumer.poll() will read all data available from the socket buffer to consumer 
buffer. However, if consumer uses ssl and there is remaining data, 
consumer.poll() may only read 16 KB (the size of 
SslTransportLayer.appReadBuffer) from socket buffer. This will reduce efficient 
of consumer.poll() by asking user to call more poll() to get the same amount of 
data.

Furthermore, we observe that for users who naively sleep a constant time after 
each consumer.poll(), some partition will lag behind after they switch from 
plaintext to ssl. Here is the explanation why this can happen.

Say there are 1 partition of 1MB/sec and 9 partition of 32KB/sec. Leaders of 
these partitions are all different and consumer is consuming these 10 
partitions. Let's also assume that socket read buffer size is large enough and 
consume sleeps 1 sec between consumer.poll(). 1 sec is long enough for consumer 
to receive the FetchResponse back from broker.

When consumer uses plaintext, each consumer.poll() will read all data from the 
socket buffer and it means 1 MB data is read from each partition.

When consumer uses ssl, each consumer.poll() is likely to find that there is 
some data available in the memory. In this case consumer only reads 16 KB data 
from other sockets, particularly the socket for the broker with the large 
partition. Then the throughput of the large partition will be limited to 
16KB/sec.

Arguably user should not sleep 1 sec if its consumer is lagging behind. But on 
Kafka dev side it is nice to keep the previous behavior and optimize 
consumer.poll() to read as much data from socket as possible.

Author: Dong Lin 

Reviewers: Rajini Sivaram , Jason Gustafson 


Closes #4248 from lindong28/KAFKA-6258

(cherry picked from commit 066bfc314c912aa90283a1c1c53a958237d1adff)
Signed-off-by: Jason Gustafson 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45a7bb02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45a7bb02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45a7bb02

Branch: refs/heads/1.0
Commit: 45a7bb02f2b22d8c1a6234fac09d3e6165edb87b
Parents: a64a693
Author: Dong Lin 
Authored: Mon Dec 18 11:15:09 2017 -0800
Committer: Jason Gustafson 
Committed: Mon Dec 18 11:21:45 2017 -0800

--
 .../kafka/common/network/SslTransportLayer.java | 25 ++
 .../kafka/common/network/NioEchoServer.java |  9 +++-
 .../kafka/common/network/SslSelectorTest.java   | 49 +++-
 .../common/network/SslTransportLayerTest.java   | 48 +++
 4 files changed, 119 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/45a7bb02/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index a2545a4..399e93f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -92,7 +92,7 @@ public class SslTransportLayer implements TransportLayer {
 this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
 this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
 this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
-
+
 //clear & set netRead & netWrite buffers
 netWriteBuffer.position(0);
 netWriteBuffer.limit(0);
@@ -483,7 +483,8 @@ public class SslTransportLayer implements TransportLayer {
 
 
 /**
-* Reads a sequence of bytes from this channel into the given buffer.
+* Reads a sequence of bytes from this channel into the given buffer. Reads 
as much as possible
+* until either the dst buffer is full or there is no more data in the 
socket.
 *
 * @param dst The buffer into which bytes are to be transferred
 * @return The number of bytes read, possible zero or -1 if the channel has 
reached end-of-stream
@@ -501,8 +502,10 @@ public class SslTransportLayer implements TransportLayer {
 read = readFromAppBuffer(dst);
 }
 
-int netread = 0;
-if (dst.remaining() > 0) {
+boolean isClosed = false;
+// Each loop reads at most once from the socket.
+while (dst.remaining() > 0) {
+int netread = 0;
 netReadBuffer = Utils

kafka git commit: KAFKA-6258; SSLTransportLayer should keep reading from socket until either the buffer is full or the socket has no more data

2017-12-18 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk e86f70ed2 -> 066bfc314


KAFKA-6258; SSLTransportLayer should keep reading from socket until either the 
buffer is full or the socket has no more data

When consumer uses plaintext and there is remaining data in consumer's buffer, 
consumer.poll() will read all data available from the socket buffer to consumer 
buffer. However, if consumer uses ssl and there is remaining data, 
consumer.poll() may only read 16 KB (the size of 
SslTransportLayer.appReadBuffer) from socket buffer. This will reduce efficient 
of consumer.poll() by asking user to call more poll() to get the same amount of 
data.

Furthermore, we observe that for users who naively sleep a constant time after 
each consumer.poll(), some partition will lag behind after they switch from 
plaintext to ssl. Here is the explanation why this can happen.

Say there are 1 partition of 1MB/sec and 9 partition of 32KB/sec. Leaders of 
these partitions are all different and consumer is consuming these 10 
partitions. Let's also assume that socket read buffer size is large enough and 
consume sleeps 1 sec between consumer.poll(). 1 sec is long enough for consumer 
to receive the FetchResponse back from broker.

When consumer uses plaintext, each consumer.poll() will read all data from the 
socket buffer and it means 1 MB data is read from each partition.

When consumer uses ssl, each consumer.poll() is likely to find that there is 
some data available in the memory. In this case consumer only reads 16 KB data 
from other sockets, particularly the socket for the broker with the large 
partition. Then the throughput of the large partition will be limited to 
16KB/sec.

Arguably user should not sleep 1 sec if its consumer is lagging behind. But on 
Kafka dev side it is nice to keep the previous behavior and optimize 
consumer.poll() to read as much data from socket as possible.

Author: Dong Lin 

Reviewers: Rajini Sivaram , Jason Gustafson 


Closes #4248 from lindong28/KAFKA-6258


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/066bfc31
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/066bfc31
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/066bfc31

Branch: refs/heads/trunk
Commit: 066bfc314c912aa90283a1c1c53a958237d1adff
Parents: e86f70e
Author: Dong Lin 
Authored: Mon Dec 18 11:15:09 2017 -0800
Committer: Jason Gustafson 
Committed: Mon Dec 18 11:15:09 2017 -0800

--
 .../kafka/common/network/SslTransportLayer.java | 25 ++
 .../kafka/common/network/NioEchoServer.java |  9 +++-
 .../kafka/common/network/SslSelectorTest.java   | 51 +++-
 .../common/network/SslTransportLayerTest.java   | 48 ++
 4 files changed, 121 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/066bfc31/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 69ca037..49f1d66 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -89,7 +89,7 @@ public class SslTransportLayer implements TransportLayer {
 this.netReadBuffer = ByteBuffer.allocate(netReadBufferSize());
 this.netWriteBuffer = ByteBuffer.allocate(netWriteBufferSize());
 this.appReadBuffer = ByteBuffer.allocate(applicationBufferSize());
-
+
 //clear & set netRead & netWrite buffers
 netWriteBuffer.position(0);
 netWriteBuffer.limit(0);
@@ -482,7 +482,8 @@ public class SslTransportLayer implements TransportLayer {
 
 
 /**
-* Reads a sequence of bytes from this channel into the given buffer.
+* Reads a sequence of bytes from this channel into the given buffer. Reads 
as much as possible
+* until either the dst buffer is full or there is no more data in the 
socket.
 *
 * @param dst The buffer into which bytes are to be transferred
 * @return The number of bytes read, possible zero or -1 if the channel has 
reached end-of-stream
@@ -500,8 +501,10 @@ public class SslTransportLayer implements TransportLayer {
 read = readFromAppBuffer(dst);
 }
 
-int netread = 0;
-if (dst.remaining() > 0) {
+boolean isClosed = false;
+// Each loop reads at most once from the socket.
+while (dst.remaining() > 0) {
+int netread = 0;
 netReadBuffer = Utils.ensureCapacity(netReadBuffer, 
netReadBufferSize());
 if (netReadBuffer.remaining() >