[ 
https://issues.apache.org/jira/browse/KAFKA-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395002#comment-16395002
 ] 

ASF GitHub Bot commented on KAFKA-4493:
---------------------------------------

taku-k closed pull request #2408: KAFKA-4493: Validate a plaintext client 
connection to a SSL broker
URL: https://github.com/apache/kafka/pull/2408
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java
 
b/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java
new file mode 100644
index 00000000000..b1b69af09d6
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.KafkaException;
+
+public class InvalidTransportLayerException extends KafkaException {
+
+    public InvalidTransportLayerException(String message) {
+        super(message);
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 409775cd978..c7a35d1f552 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -12,11 +12,14 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.utils.Utils;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.ScatteringByteChannel;
+import java.util.regex.Pattern;
 
 /**
  * A size delimited Receive that consists of a 4 byte network-ordered size N 
followed by N bytes of content
@@ -31,6 +34,13 @@
     private final int maxSize;
     private ByteBuffer buffer;
 
+    private boolean ensureThrough = false;
+    private ByteBuffer tempOverBuf;
+    /**
+     * Supporting TLSv1, TLSv1.1 and TLSv1.2
+     */
+    private final static Pattern SSL_HANDSHAKE_ALERT = 
Pattern.compile("15030[123]00");
+
 
     public NetworkReceive(String source, ByteBuffer buffer) {
         this.source = source;
@@ -82,6 +92,9 @@ public long readFromReadableChannel(ReadableByteChannel 
channel) throws IOExcept
             if (bytesRead < 0)
                 throw new EOFException();
             read += bytesRead;
+
+            ensureNotHandshakeFailurePacket(channel, size);
+
             if (!size.hasRemaining()) {
                 size.rewind();
                 int receiveSize = size.getInt();
@@ -93,6 +106,15 @@ public long readFromReadableChannel(ReadableByteChannel 
channel) throws IOExcept
                 this.buffer = ByteBuffer.allocate(receiveSize);
             }
         }
+
+        // Packet head is the same as a SSL handshake bytes but actually the 
packet is correct.
+        // Over read data must be inserted into the buffer.
+        if (tempOverBuf != null) {
+            buffer.put(tempOverBuf);
+            read += 1024;
+            tempOverBuf = null;
+        }
+
         if (buffer != null) {
             int bytesRead = channel.read(buffer);
             if (bytesRead < 0)
@@ -107,4 +129,23 @@ public ByteBuffer payload() {
         return this.buffer;
     }
 
+    // We determine if a peer connection uses SSL/TLS by seeing from the first 
few bytes.
+    private void ensureNotHandshakeFailurePacket(ReadableByteChannel channel, 
ByteBuffer size) throws IOException {
+        if (ensureThrough)
+            return;
+        ensureThrough = true;
+        String head = Utils.hexToString(size.array());
+        if (SSL_HANDSHAKE_ALERT.matcher(head).find()) {
+            // Actually, SSL record size is 2 bytes, but head byte is already 
read.
+            ByteBuffer recordSizeBuf = ByteBuffer.allocate(1);
+            channel.read(recordSizeBuf);
+            recordSizeBuf.rewind();
+            // If the data exceeding the SSL record size is read, this packet 
is correct.
+            tempOverBuf = ByteBuffer.allocate(1024);
+            int bytesRead = channel.read(tempOverBuf);
+            if (bytesRead == recordSizeBuf.get()) {
+                throw new InvalidTransportLayerException("Destination 
connection may be SSL/TLS");
+            }
+        }
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index df3526618cc..d0703474f29 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -369,6 +369,9 @@ private void pollSelectionKeys(Iterable<SelectionKey> 
selectionKeys,
                 if (!key.isValid())
                     close(channel, true);
 
+            } catch (InvalidTransportLayerException e) {
+                close(channel, true);
+                throw e;
             } catch (Exception e) {
                 String desc = channel.socketDescription();
                 if (e instanceof IOException)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index ac8d0786d7b..9b656a94dfb 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -57,6 +57,8 @@
 
     private static final Logger log = LoggerFactory.getLogger(Utils.class);
 
+    private static final char[] HEX_TO_CHAR = "0123456789ABCDEF".toCharArray();
+
     /**
      * Get a sorted list representation of a collection.
      * @param collection The collection to sort
@@ -803,4 +805,17 @@ public static long computeChecksum(ByteBuffer buffer, int 
start, int size) {
         return Crc32.crc32(buffer.array(), buffer.arrayOffset() + start, size);
     }
 
+    /**
+     * Convert byte array into a string
+     * @param data Byte array to convert into a string
+     * @return A string
+     */
+    public static String hexToString(byte[] data) {
+        StringBuilder r = new StringBuilder(data.length * 2);
+        for (byte b : data) {
+            r.append(HEX_TO_CHAR[(b >> 4) & 0xF]);
+            r.append(HEX_TO_CHAR[b & 0xF]);
+        }
+        return r.toString();
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 01d8a25b21d..d57acceb86c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -15,6 +15,7 @@
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -405,6 +406,21 @@ public void testClosePlaintext() throws Exception {
         testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder());
     }
 
+    @Test(expected = InvalidTransportLayerException.class)
+    public void testInvalidTransportLayer() throws Exception {
+        String node = "0";
+        server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, 
sslServerConfigs);
+        createPlaintextSelector();
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
+        selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
+        Send send = new NetworkSend(node, ByteBuffer.wrap("test".getBytes()));
+        selector.send(send);
+        int secondsLeft = 10;
+        while (secondsLeft-- > 0) {
+            selector.poll(1000L);
+        }
+    }
+
     private void testClose(SecurityProtocol securityProtocol, ChannelBuilder 
clientChannelBuilder) throws Exception {
         String node = "0";
         server = NetworkTestUtils.createEchoServer(securityProtocol, 
sslServerConfigs);
@@ -460,7 +476,14 @@ protected SslTransportLayer buildTransportLayer(SslFactory 
sslFactory, String id
         this.channelBuilder.configure(sslClientConfigs);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder);
     }
-    
+
+    private void createPlaintextSelector() {
+        this.channelBuilder = new PlaintextChannelBuilder();
+        Map<String, Object> emptyConfig = new HashMap<>();
+        this.channelBuilder.configure(emptyConfig);
+        this.selector = new Selector(5000, new Metrics(), new MockTime(), 
"MetricGroup", channelBuilder);
+    }
+
     /**
      * SSLTransportLayer with overrides for packet and application buffer size 
to test buffer resize
      * code path. The overridden buffer size starts with a small value and 
increases in size when the buffer


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connections to Kafka brokers should be validated
> ------------------------------------------------
>
>                 Key: KAFKA-4493
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4493
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Ismael Juma
>            Priority: Major
>
> There have been a few reports of Kafka clients throwing an OOM because they 
> read 4 bytes from the stream and then use that to allocate a ByteBuffer 
> without validating that they are using the right security protocol or even 
> communicating with a Kafka broker.
> It would be good to perform some validation in order to show a useful error 
> message to the user instead of the OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to