This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/flume.git
commit 93fbd74336e573b5add3b81760d57a459f8bc440 Author: zhou zhuohan <[email protected]> AuthorDate: Sun Sep 18 04:14:26 2022 +0800 FLUME-3315 fix kafka ssl https verification (#382) Authored-by: ninjazhou <[email protected]> --- .../apache/flume/channel/kafka/KafkaChannel.java | 47 +++++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 33 +++++---- .../org/apache/flume/sink/kafka/KafkaSink.java | 12 +++- .../org/apache/flume/sink/kafka/TestKafkaSink.java | 41 +++++++++++ .../org/apache/flume/sink/kafka/util/TestUtil.java | 30 +++++++- .../src/test/resources/keystorefile.jks | Bin 0 -> 1294 bytes .../src/test/resources/truststorefile.jks | Bin 0 -> 887 bytes .../org/apache/flume/source/kafka/KafkaSource.java | 80 +++++++++++++++------ .../flume/source/kafka/KafkaSourceConstants.java | 2 + .../source/kafka/KafkaSourceEmbeddedKafka.java | 31 +++++++- .../apache/flume/source/kafka/TestKafkaSource.java | 65 ++++++++++++++++- .../src/test/resources/keystorefile.jks | Bin 0 -> 1294 bytes .../src/test/resources/truststorefile.jks | Bin 0 -> 887 bytes .../apache/flume/shared/kafka/KafkaSSLUtil.java | 4 +- 14 files changed, 295 insertions(+), 50 deletions(-) diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 275a614e3..65cad9937 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.Time; @@ -78,7 +79,33 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_ACKS; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_GROUP_ID; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_PARSE_AS_FLUME_EVENT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_POLL_TIMEOUT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_TOPIC; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.MIGRATE_ZOOKEEPER_OFFSETS; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.POLL_TIMEOUT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY; +import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK; +import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled; public class KafkaChannel extends BasicChannelSemantics { @@ -269,7 +296,14 @@ public class KafkaChannel extends BasicChannelSemantics { // Defaults overridden based on config producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX)); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); - + // The default value of `ssl.endpoint.identification.algorithm` + // is changed to `https`, since kafka client 2.0+ + // And because flume does not accept an empty string as property value, + // so we need to use an alternative custom property + // `ssl.disableTLSHostnameVerification` to check if enable fqdn check. + if (isSSLEnabled(producerProps) && "true".equalsIgnoreCase(producerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) { + producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } KafkaSSLUtil.addGlobalSSLParameters(producerProps); } @@ -288,7 +322,14 @@ public class KafkaChannel extends BasicChannelSemantics { consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - + // The default value of `ssl.endpoint.identification.algorithm` + // is changed to `https`, since kafka client 2.0+ + // And because flume does not accept an empty string as property value, + // so we need to use an alternative custom property + // `ssl.disableTLSHostnameVerification` to check if enable fqdn check. + if (isSSLEnabled(consumerProps) && "true".equalsIgnoreCase(consumerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) { + consumerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } KafkaSSLUtil.addGlobalSSLParameters(consumerProps); } diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 2f8938c95..9355fad16 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1665,14 +1665,15 @@ Specifying the truststore is optional here, the global truststore can be used in For more details about the global SSL setup, see the `SSL/TLS support`_ section. Note: By default the property ``ssl.endpoint.identification.algorithm`` -is not defined, so hostname verification is not performed. -In order to enable hostname verification, set the following properties +is not defined, so hostname verification is performed. +Since flume does not accept an empty string as property value, in order to disable hostname verification, +we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string. .. code-block:: properties - a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS + a1.sources.source1.kafka.consumer.ssl.disableTLSHostnameVerification = true -Once enabled, clients will verify the server's fully qualified domain name (FQDN) +If not set to true, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields: #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 @@ -3279,18 +3280,19 @@ Example configuration with server side authentication and data encryption. a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore> -Specyfing the truststore is optional here, the global truststore can be used instead. +Specifying the truststore is optional here, the global truststore can be used instead. For more details about the global SSL setup, see the `SSL/TLS support`_ section. Note: By default the property ``ssl.endpoint.identification.algorithm`` -is not defined, so hostname verification is not performed. -In order to enable hostname verification, set the following properties +is not defined, so hostname verification is performed. +Since flume does not allow an empty string as configuration value, in order to disable hostname verification, +we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string. .. code-block:: properties - a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS + a1.sinks.sink1.kafka.producer.ssl.disableTLSHostnameVerification = true -Once enabled, clients will verify the server's fully qualified domain name (FQDN) +If not set to true, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields: #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 @@ -3685,19 +3687,20 @@ Example configuration with server side authentication and data encryption. a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore> -Specyfing the truststore is optional here, the global truststore can be used instead. +Specifying the truststore is optional here, the global truststore can be used instead. For more details about the global SSL setup, see the `SSL/TLS support`_ section. Note: By default the property ``ssl.endpoint.identification.algorithm`` -is not defined, so hostname verification is not performed. -In order to enable hostname verification, set the following properties +is not defined, so hostname verification is performed. +Since flume does not accept an empty string as property value, in order to disable hostname verification, +we need to set the following properties instead of setting ``ssl.endpoint.identification.algorithm`` to an empty string. .. code-block:: properties - a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS - a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS + a1.channels.channel1.kafka.producer.ssl.disableTLSHostnameVerification = true + a1.channels.channel1.kafka.consumer.ssl.disableTLSHostnameVerification = true -Once enabled, clients will verify the server's fully qualified domain name (FQDN) +If not set to true, clients will verify the server's fully qualified domain name (FQDN) against one of the following two fields: #) Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3 diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java index 3b1a86697..e4c9ff7e2 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java @@ -43,6 +43,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.config.SslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,8 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; +import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK; +import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled; import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE; import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_BATCH_SIZE; @@ -421,7 +424,14 @@ public class KafkaSink extends AbstractSink implements Configurable, BatchSizeSu kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER); kafkaProps.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX)); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); - + // The default value of `ssl.endpoint.identification.algorithm` + // is changed to `https`, since kafka client 2.0+ + // And because flume does not accept an empty string as property value, + // so we need to use an alternative custom property + // `ssl.disableTLSHostnameVerification` to check if enable fqdn check. + if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) { + kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); } diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index 97dc0bdab..5a69b16fb 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -61,6 +61,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK; import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT; import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE; import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG; @@ -72,6 +73,9 @@ import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREF import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE; import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY; import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -715,4 +719,41 @@ public class TestKafkaSink { return newTopic; } + @Test + public void testSslTopic() { + Sink kafkaSink = new KafkaSink(); + Context context = prepareDefaultContext(); + context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerSslUrl()); + context.put(KAFKA_PRODUCER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL"); + context.put(KAFKA_PRODUCER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks"); + context.put(KAFKA_PRODUCER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); + context.put(KAFKA_PRODUCER_PREFIX + SSL_DISABLE_FQDN_CHECK, "true"); + Configurables.configure(kafkaSink, context); + + Channel memoryChannel = new MemoryChannel(); + context = prepareDefaultContext(); + Configurables.configure(memoryChannel, context); + kafkaSink.setChannel(memoryChannel); + kafkaSink.start(); + + String msg = "default-topic-test"; + Transaction tx = memoryChannel.getTransaction(); + tx.begin(); + Event event = EventBuilder.withBody(msg.getBytes()); + memoryChannel.put(event); + tx.commit(); + tx.close(); + + try { + Sink.Status status = kafkaSink.process(); + if (status == Sink.Status.BACKOFF) { + fail("Error Occurred"); + } + } catch (EventDeliveryException ex) { + // ignore + } + + checkMessageArrived(msg, DEFAULT_TOPIC); + } + } \ No newline at end of file diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java index 1a87dc5ab..cfd57fc1c 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/TestUtil.java @@ -39,6 +39,11 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; + /** * A utility class for starting/stopping Kafka Server. */ @@ -50,8 +55,10 @@ public class TestUtil { private KafkaLocal kafkaServer; private boolean externalServers = true; private String kafkaServerUrl; + private String kafkaServerSslUrl; private String zkServerUrl; private int kafkaLocalPort; + private int kafkaLocalSslPort; private Properties clientProps; private int zkLocalPort; private KafkaConsumer<String, String> consumer; @@ -80,7 +87,9 @@ public class TestUtil { String hostname = InetAddress.getLocalHost().getHostName(); zkLocalPort = getNextPort(); kafkaLocalPort = getNextPort(); + kafkaLocalSslPort = getNextPort(); kafkaServerUrl = hostname + ":" + kafkaLocalPort; + kafkaServerSslUrl = hostname + ":" + kafkaLocalSslPort; zkServerUrl = hostname + ":" + zkLocalPort; } clientProps = createClientProperties(); @@ -112,12 +121,23 @@ public class TestUtil { "/kafka-server.properties")); // override the Zookeeper url. kafkaProperties.setProperty("zookeeper.connect", getZkUrl()); - // override the Kafka server port - kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort)); + // to enable ssl feature, + // we need to use listeners instead of using port property + // kafkaProperties.setProperty("port", Integer.toString(kafkaLocalPort)); + kafkaProperties.put("listeners", + String.format("PLAINTEXT://%s,SSL://%s", + getKafkaServerUrl(), + getKafkaServerSslUrl() + ) + ); + // ssl configuration + kafkaProperties.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks"); + kafkaProperties.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); + kafkaProperties.put(SSL_KEYSTORE_LOCATION_CONFIG, "src/test/resources/keystorefile.jks"); + kafkaProperties.put(SSL_KEYSTORE_PASSWORD_CONFIG, "password"); kafkaServer = new KafkaLocal(kafkaProperties); kafkaServer.start(); logger.info("Kafka Server is successfully started on port " + kafkaLocalPort); - return true; } catch (Exception e) { @@ -241,4 +261,8 @@ public class TestUtil { public String getKafkaServerUrl() { return kafkaServerUrl; } + + public String getKafkaServerSslUrl() { + return kafkaServerSslUrl; + } } diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks new file mode 100644 index 000000000..20ac6a816 Binary files /dev/null and b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/keystorefile.jks differ diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks new file mode 100644 index 000000000..a98c4907e Binary files /dev/null and b/flume-ng-sinks/flume-ng-kafka-sink/src/test/resources/truststorefile.jks differ diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java index 0c65302b2..7a64ed742 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java @@ -16,22 +16,8 @@ */ package org.apache.flume.source.kafka; -import java.io.ByteArrayInputStream; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import kafka.cluster.Broker; import kafka.cluster.BrokerEndPoint; import kafka.zk.KafkaZkClient; @@ -61,20 +47,61 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.network.ListenerName; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; - -import static org.apache.flume.source.kafka.KafkaSourceConstants.*; - import scala.Option; import scala.collection.JavaConverters; +import java.io.ByteArrayInputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK; +import static org.apache.flume.shared.kafka.KafkaSSLUtil.isSSLEnabled; +import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE; +import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AVRO_EVENT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_DURATION; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_BATCH_SIZE; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_GROUP_ID; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_KEY_DESERIALIZER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_MIGRATE_ZOOKEEPER_OFFSETS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_SET_TOPIC_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_VALUE_DESERIALIZER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.KEY_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.MIGRATE_ZOOKEEPER_OFFSETS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.OFFSET_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID; +import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.SET_TOPIC_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX; +import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC_HEADER; +import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY; + /** * A Source for Kafka which reads messages from kafka topics. * @@ -386,7 +413,7 @@ public class KafkaSource extends AbstractPollableSource // For backwards compatibility look up the bootstrap from zookeeper log.warn("{} is deprecated. Please use the parameter {}", ZOOKEEPER_CONNECT_FLUME_KEY, BOOTSTRAP_SERVERS); - // Lookup configured security protocol, just in case its not default + // Lookup configured security protocol, just in case it's not default String securityProtocolStr = context.getSubProperties(KAFKA_CONSUMER_PREFIX) .get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); @@ -453,7 +480,14 @@ public class KafkaSource extends AbstractPollableSource kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, DEFAULT_AUTO_COMMIT); - + // The default value of `ssl.endpoint.identification.algorithm` + // is changed to `https`, since kafka client 2.0+ + // And because flume does not accept an empty string as property value, + // so we need to use an alternative custom property + // `ssl.disableTLSHostnameVerification` to check if enable fqdn check. + if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) { + kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } KafkaSSLUtil.addGlobalSSLParameters(kafkaProps); } diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java index 0e15e7380..8ac437add 100644 --- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java +++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSourceConstants.java @@ -59,4 +59,6 @@ public class KafkaSourceConstants { public static final boolean DEFAULT_SET_TOPIC_HEADER = true; public static final String TOPIC_HEADER = "topicHeader"; + + } diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java index 0799664d6..397af64cf 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/KafkaSourceEmbeddedKafka.java @@ -39,6 +39,11 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; + public class KafkaSourceEmbeddedKafka { public static String HOST = InetAddress.getLoopbackAddress().getCanonicalHostName(); @@ -57,6 +62,7 @@ public class KafkaSourceEmbeddedKafka { private int zkPort = findFreePort(); // none-standard private int serverPort = findFreePort(); + private int serverSslPort = findFreePort(); KafkaProducer<String, byte[]> producer; File dir; @@ -73,10 +79,25 @@ public class KafkaSourceEmbeddedKafka { props.put("zookeeper.connect",zookeeper.getConnectString()); props.put("broker.id","1"); props.put("host.name", "localhost"); - props.put("port", String.valueOf(serverPort)); + // to enable ssl feature, + // we need to use listeners instead of using port property + // props.put("port", String.valueOf(serverPort)); + props.put("listeners", + String.format("PLAINTEXT://%s:%d,SSL://%s:%d", + HOST, + serverPort, + HOST, + serverSslPort + ) + ); props.put("log.dir", dir.getAbsolutePath()); props.put("offsets.topic.replication.factor", "1"); props.put("auto.create.topics.enable", "false"); + // ssl configuration + props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks"); + props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); + props.put(SSL_KEYSTORE_LOCATION_CONFIG, "src/test/resources/keystorefile.jks"); + props.put(SSL_KEYSTORE_PASSWORD_CONFIG, "password"); if (properties != null) { props.putAll(properties); } @@ -100,6 +121,14 @@ public class KafkaSourceEmbeddedKafka { return HOST + ":" + serverPort; } + public String getBootstrapSslServers() { + return String.format("%s:%s", HOST, serverSslPort); + } + + public String getBootstrapSslIpPortServers() { + return String.format("%s:%s", "127.0.0.1", serverSslPort); + } + private void initProducer() { Properties props = new Properties(); props.put("bootstrap.servers", HOST + ":" + serverPort); diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java index 39e0e1951..ee913c9e6 100644 --- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java +++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java @@ -19,7 +19,6 @@ package org.apache.flume.source.kafka; import com.google.common.base.Charsets; import com.google.common.collect.Lists; - import kafka.zk.KafkaZkClient; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.EncoderFactory; @@ -37,7 +36,6 @@ import org.apache.flume.lifecycle.LifecycleState; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; @@ -45,6 +43,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Time; @@ -74,11 +73,13 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import static org.apache.flume.shared.kafka.KafkaSSLUtil.SSL_DISABLE_FQDN_CHECK; import static org.apache.flume.source.kafka.KafkaSourceConstants.AVRO_EVENT; import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_DURATION_MS; import static org.apache.flume.source.kafka.KafkaSourceConstants.BATCH_SIZE; import static org.apache.flume.source.kafka.KafkaSourceConstants.BOOTSTRAP_SERVERS; import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_AUTO_COMMIT; +import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; import static org.apache.flume.source.kafka.KafkaSourceConstants.KAFKA_CONSUMER_PREFIX; import static org.apache.flume.source.kafka.KafkaSourceConstants.OLD_GROUP_ID; import static org.apache.flume.source.kafka.KafkaSourceConstants.PARTITION_HEADER; @@ -86,9 +87,13 @@ import static org.apache.flume.source.kafka.KafkaSourceConstants.TIMESTAMP_HEADE import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPIC; import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS; import static org.apache.flume.source.kafka.KafkaSourceConstants.TOPICS_REGEX; -import static org.apache.flume.source.kafka.KafkaSourceConstants.DEFAULT_TOPIC_HEADER; import static org.apache.flume.source.kafka.KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME_KEY; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG; +import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -1016,4 +1021,58 @@ public class TestKafkaSource { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); return props; } + + @SuppressWarnings("unchecked") + @Test + public void testSslSource() throws EventDeliveryException, + SecurityException, + IllegalArgumentException, + InterruptedException { + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); + context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapSslServers()); + context.put(KAFKA_CONSUMER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL"); + context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks"); + context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); + context.put(KAFKA_CONSUMER_PREFIX + SSL_DISABLE_FQDN_CHECK, "true"); + kafkaSource.configure(context); + startKafkaSource(); + + Thread.sleep(500L); + kafkaServer.produce(topic0, "", "hello, world"); + Thread.sleep(500L); + + Assert.assertEquals("", kafkaSource.getConsumerProps().get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)); + Assert.assertEquals(Status.READY, kafkaSource.process()); + Assert.assertEquals(1, events.size()); + Assert.assertEquals("hello, world", + new String(events.get(0).getBody(), Charsets.UTF_8) + ); + } + + + @SuppressWarnings("unchecked") + @Test + public void testSslWithFqdnValidationFailedSource() throws EventDeliveryException, + SecurityException, + IllegalArgumentException, + InterruptedException { + context.put(TOPICS, topic0); + context.put(BATCH_SIZE, "1"); + context.put(BOOTSTRAP_SERVERS, kafkaServer.getBootstrapSslIpPortServers()); + context.put(KAFKA_CONSUMER_PREFIX + SECURITY_PROTOCOL_CONFIG, "SSL"); + context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_LOCATION_CONFIG, "src/test/resources/truststorefile.jks"); + context.put(KAFKA_CONSUMER_PREFIX + SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); + kafkaSource.configure(context); + startKafkaSource(); + + Thread.sleep(500L); + kafkaServer.produce(topic0, "", "hello, world"); + Thread.sleep(500L); + + assertNull(kafkaSource.getConsumerProps().get(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)); + Assert.assertEquals(Status.BACKOFF, kafkaSource.process()); + } + + } diff --git a/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks b/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks new file mode 100644 index 000000000..20ac6a816 Binary files /dev/null and b/flume-ng-sources/flume-kafka-source/src/test/resources/keystorefile.jks differ diff --git a/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks b/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks new file mode 100644 index 000000000..a98c4907e Binary files /dev/null and b/flume-ng-sources/flume-kafka-source/src/test/resources/truststorefile.jks differ diff --git a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java index 78e6f639d..4f0300859 100644 --- a/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java +++ b/flume-shared/flume-shared-kafka/src/main/java/org/apache/flume/shared/kafka/KafkaSSLUtil.java @@ -27,6 +27,8 @@ import java.util.Properties; public class KafkaSSLUtil { + public static final String SSL_DISABLE_FQDN_CHECK = "ssl.disableTLSHostnameVerification"; + private KafkaSSLUtil() { } @@ -61,7 +63,7 @@ public class KafkaSSLUtil { } } - private static boolean isSSLEnabled(Properties kafkaProps) { + public static boolean isSSLEnabled(Properties kafkaProps) { String securityProtocol = kafkaProps.getProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG);
