[ https://issues.apache.org/jira/browse/KAFKA-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16395002#comment-16395002 ]
ASF GitHub Bot commented on KAFKA-4493: --------------------------------------- taku-k closed pull request #2408: KAFKA-4493: Validate a plaintext client connection to a SSL broker URL: https://github.com/apache/kafka/pull/2408 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java b/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java new file mode 100644 index 00000000000..b1b69af09d6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.KafkaException; + +public class InvalidTransportLayerException extends KafkaException { + + public InvalidTransportLayerException(String message) { + super(message); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index 409775cd978..c7a35d1f552 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -12,11 +12,14 @@ */ package org.apache.kafka.common.network; +import org.apache.kafka.common.utils.Utils; + import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.ScatteringByteChannel; +import java.util.regex.Pattern; /** * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content @@ -31,6 +34,13 @@ private final int maxSize; private ByteBuffer buffer; + private boolean ensureThrough = false; + private ByteBuffer tempOverBuf; + /** + * Supporting TLSv1, TLSv1.1 and TLSv1.2 + */ + private final static Pattern SSL_HANDSHAKE_ALERT = Pattern.compile("15030[123]00"); + public NetworkReceive(String source, ByteBuffer buffer) { this.source = source; @@ -82,6 +92,9 @@ public long readFromReadableChannel(ReadableByteChannel channel) throws IOExcept if (bytesRead < 0) throw new EOFException(); read += bytesRead; + + ensureNotHandshakeFailurePacket(channel, size); + if (!size.hasRemaining()) { size.rewind(); int receiveSize = size.getInt(); @@ -93,6 +106,15 @@ public long readFromReadableChannel(ReadableByteChannel channel) throws IOExcept this.buffer = ByteBuffer.allocate(receiveSize); } } + + // Packet head is the same as a SSL handshake bytes but actually the packet is correct. + // Over read data must be inserted into the buffer. + if (tempOverBuf != null) { + buffer.put(tempOverBuf); + read += 1024; + tempOverBuf = null; + } + if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0) @@ -107,4 +129,23 @@ public ByteBuffer payload() { return this.buffer; } + // We determine if a peer connection uses SSL/TLS by seeing from the first few bytes. + private void ensureNotHandshakeFailurePacket(ReadableByteChannel channel, ByteBuffer size) throws IOException { + if (ensureThrough) + return; + ensureThrough = true; + String head = Utils.hexToString(size.array()); + if (SSL_HANDSHAKE_ALERT.matcher(head).find()) { + // Actually, SSL record size is 2 bytes, but head byte is already read. + ByteBuffer recordSizeBuf = ByteBuffer.allocate(1); + channel.read(recordSizeBuf); + recordSizeBuf.rewind(); + // If the data exceeding the SSL record size is read, this packet is correct. + tempOverBuf = ByteBuffer.allocate(1024); + int bytesRead = channel.read(tempOverBuf); + if (bytesRead == recordSizeBuf.get()) { + throw new InvalidTransportLayerException("Destination connection may be SSL/TLS"); + } + } + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index df3526618cc..d0703474f29 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -369,6 +369,9 @@ private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, if (!key.isValid()) close(channel, true); + } catch (InvalidTransportLayerException e) { + close(channel, true); + throw e; } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index ac8d0786d7b..9b656a94dfb 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -57,6 +57,8 @@ private static final Logger log = LoggerFactory.getLogger(Utils.class); + private static final char[] HEX_TO_CHAR = "0123456789ABCDEF".toCharArray(); + /** * Get a sorted list representation of a collection. * @param collection The collection to sort @@ -803,4 +805,17 @@ public static long computeChecksum(ByteBuffer buffer, int start, int size) { return Crc32.crc32(buffer.array(), buffer.arrayOffset() + start, size); } + /** + * Convert byte array into a string + * @param data Byte array to convert into a string + * @return A string + */ + public static String hexToString(byte[] data) { + StringBuilder r = new StringBuilder(data.length * 2); + for (byte b : data) { + r.append(HEX_TO_CHAR[(b >> 4) & 0xF]); + r.append(HEX_TO_CHAR[b & 0xF]); + } + return r.toString(); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 01d8a25b21d..d57acceb86c 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -15,6 +15,7 @@ import static org.junit.Assert.fail; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -405,6 +406,21 @@ public void testClosePlaintext() throws Exception { testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder()); } + @Test(expected = InvalidTransportLayerException.class) + public void testInvalidTransportLayer() throws Exception { + String node = "0"; + server = NetworkTestUtils.createEchoServer(SecurityProtocol.SSL, sslServerConfigs); + createPlaintextSelector(); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); + Send send = new NetworkSend(node, ByteBuffer.wrap("test".getBytes())); + selector.send(send); + int secondsLeft = 10; + while (secondsLeft-- > 0) { + selector.poll(1000L); + } + } + private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception { String node = "0"; server = NetworkTestUtils.createEchoServer(securityProtocol, sslServerConfigs); @@ -460,7 +476,14 @@ protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id this.channelBuilder.configure(sslClientConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); } - + + private void createPlaintextSelector() { + this.channelBuilder = new PlaintextChannelBuilder(); + Map<String, Object> emptyConfig = new HashMap<>(); + this.channelBuilder.configure(emptyConfig); + this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder); + } + /** * SSLTransportLayer with overrides for packet and application buffer size to test buffer resize * code path. The overridden buffer size starts with a small value and increases in size when the buffer ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connections to Kafka brokers should be validated > ------------------------------------------------ > > Key: KAFKA-4493 > URL: https://issues.apache.org/jira/browse/KAFKA-4493 > Project: Kafka > Issue Type: Improvement > Reporter: Ismael Juma > Priority: Major > > There have been a few reports of Kafka clients throwing an OOM because they > read 4 bytes from the stream and then use that to allocate a ByteBuffer > without validating that they are using the right security protocol or even > communicating with a Kafka broker. > It would be good to perform some validation in order to show a useful error > message to the user instead of the OOM. -- This message was sent by Atlassian JIRA (v7.6.3#76005)