Ottomata has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/394144 )
Change subject: Puppetize SSL for Kafka broker ...................................................................... Puppetize SSL for Kafka broker Bug: T166167 Bug: T121561 Change-Id: Id8a7240e44b492801fd1834028a9eff52334d8f9 --- M modules/profile/manifests/kafka/broker.pp 1 file changed, 129 insertions(+), 62 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/puppet refs/changes/44/394144/1 diff --git a/modules/profile/manifests/kafka/broker.pp b/modules/profile/manifests/kafka/broker.pp index 55e1196..5f7c1d9 100644 --- a/modules/profile/manifests/kafka/broker.pp +++ b/modules/profile/manifests/kafka/broker.pp @@ -3,7 +3,30 @@ # cluster. $kafka_cluster_name must have an entry in the hiera 'kafka_clusters' # variable, and $::fqdn must be listed as a broker there. # +# == SSL Configuration +# +# To configure SSL for Kafka brokers, you need the following files distributable by Puppet: +# +# - A keystore.jks file - Contains the key and certificate for this broker +# - A truststore.jks file - Contains the CA certificate that signed the broker's certificate +# +# It is expected that the CA certificate in the truststore will also be used to sign +# all Kafka client certificates. +# +# Once these are somewhere in puppet (probably the private repository), you should set +# $ssl_enabled to true, $ssl_keystore_source and $ssl_truststore_source to their +# puppet:// source paths, and also set $ssl_password to the password used to encrypt +# the key, keystore, and truststores. +# +# The default values of $ssl_keystore_source and $ssl_truststore_source expect that you +# used cergen with the puppet_ca to generate and commit files in the private puppet +# repository in the secret module at modules/secret/files/cergen. If you did, then you +# need only set $ssl_enabled and $ssl_password. +# +# See https://wikitech.wikimedia.org/wiki/Cergen for more details. +# # == Parameters +# # [*kafka_cluster_name*] # Kafka cluster name. This should be the non DC/project suffixed cluster name, # e.g. main, aggregate, simple, etc. The kafka_cluster_name puppet parser @@ -15,75 +38,96 @@ # # [*plaintext*] # Boolean whether to use a plaintext listener on port 9092. -# Hieara: profile::kafka::broker::plaintext +# Hiera: profile::kafka::broker::plaintext. Default true. # -# [*tls_secrets_path*] -# TODO: this does not yet work. -# Relative base path to tls secrets files in operations puppet private repository. -# If set, TLS/SSL will be configured for Kafka. Each broker needs to have -# a ca-manager created directory for the node's $::fqdn in this path. -# E.g. if this is set to 'kafka/common', and the current node is kafka1101.eqiad.wmnet, -# then modules/secret/secrets/kafka/common/kafka1101.eqiad.wmnet/ should exist -# and should contain keyfiles and keystores created by ca-manager. -# Hiera: 'profile::kafka::broker::tls_secrets_path' +# [*ssl_enabled] +# If true, an SSL listener will be configured. Default: false # -# [*tls_key_password*] -# Password for keystores and keys in tls_secrets_path. You should +# [*ssl_keystore_source] +# THe puppet source path to the Java keystore containing this broker's +# certificate and key. +# Default: puppet:///modules/secret/cergen/kafka_broker_${::hostname}/kafka_broker_${::hostname}.keystore.jks +# This default assumes that you have a keystore.jks file created with cergen in the puppet +# private repository. +# +# [*ssl_truststore_source*] +# If given, Kafka SSL will be configured with a truststore, which means that +# client certificates must be signed by the CA cert inside the truststore. +# Default: puppet:///modules/secret/cergen/kafka_broker_${::hostname}/puppet_ca.truststore.jks +# This default assumes that you have a truststore.jks file created with cergen in the puppet +# private repository. +# +# [*ssl_password*] +# Password for keystores and keys. You should # set this in hiera in the operations puppet private repository. -# Hiera: profile::kafka::broker::tls_key_password +# Hiera: profile::kafka::broker::ssl_password This expects +# that all keystore, truststores, and keys use the same password. +# Default undef # # [*log_dirs*] # Array of Kafka log data directories. The confluent::kafka::broker class # manages these directories but not anything above them. Unless the prefix # is /srv/kafka, then this profile tries to be nice. Otherwise, # you must ensure that any parent directories exist outside of this class. -# Hiera: profile::kafka::broker::log_dirs +# Hiera: profile::kafka::broker::log_dirs Default: ['/srv/kafka'] # # [*auto_leader_rebalance_enable*] # Hiera: profile::kafka::broker::auto_leader_rebalance_enable +# Default: true # # [*log_retention_hours*] -# Hiera: profile::kafka::broker::log_retention_hours +# Hiera: profile::kafka::broker::log_retention_hours Default: 168 (1 week) +# +# [*num_recovery_threads_per_data_dir*] +# Hiera: profile::kafka::broker::num_recovery_threads_per_data_dir Default undef +# +# [*num_io_threads*] +# Hiera: profile::kafka::broker::num_replica_fetchers Default 1 # # [*num_replica_fetchers*] -# Hiera: profile::kafka::broker::num_replica_fetchers +# Hiera: profile::kafka::broker::num_replica_fetchers Default undef # # [*nofiles_ulimit*] # Hiera: profile::kafka::broker::nofiles_ulimit +# Default: 8192 # # [*message_max_bytes*] # The largest record batch size allowed by Kafka. # If this is increased and there are consumers older # than 0.10.2, the consumers' fetch size must also be increased # so that the they can fetch record batches this large. +# Default: 1048576 # # [*auth_acls_enabled*] # Enables the kafka.security.auth.SimpleAclAuthorizer bundled with Kafka. # This will also increase the verbosity of authorization logs for a better -# user accounting. +# user accounting. Default: false # # [*monitoring_enabled*] -# Enable monitoring and alerts for this broker. +# Enable monitoring and alerts for this broker. Default: false # class profile::kafka::broker( $kafka_cluster_name = hiera('profile::kafka::broker::kafka_cluster_name'), $statsd = hiera('statsd'), - $plaintext = hiera('profile::kafka::broker::plaintext'), - # $tls_secrets_path = hiera('profile::kafka::broker::tls_secrets_path'), - # $tls_key_password = hiera('profile::kafka::broker::tls_key_password'), + $plaintext = hiera('profile::kafka::broker::plaintext', true), - $log_dirs = hiera('profile::kafka::broker::log_dirs'), - $auto_leader_rebalance_enable = hiera('profile::kafka::broker::auto_leader_rebalance_enable'), - $log_retention_hours = hiera('profile::kafka::broker::log_retention_hours'), - $num_recovery_threads_per_data_dir = hiera('profile::kafka::broker::num_recovery_threads_per_data_dir'), - $num_io_threads = hiera('profile::kafka::broker::num_io_threads'), - $num_replica_fetchers = hiera('profile::kafka::broker::num_replica_fetchers'), - $nofiles_ulimit = hiera('profile::kafka::broker::nofiles_ulimit'), + $ssl_enabled = hiera('profile::kafka::broker::ssl_enabled', false), + $ssl_keystore_source = hiera('profile::kafka::broker::ssl_keystore_source', "puppet:///modules/secret/cergen/kafka_broker_${::hostname}/kafka_broker_${::hostname}.keystore.jks"), + $ssl_truststore_source = hiera('profile::kafka::broker::ssl_keystore_source', "puppet:///modules/secret/cergen/kafka_broker_${::hostname}/puppet_ca.truststore.jks"), + $ssl_password = hiera('profile::kafka::broker::ssl_password', undef), + + $log_dirs = hiera('profile::kafka::broker::log_dirs', ['/srv/kafka']), + $auto_leader_rebalance_enable = hiera('profile::kafka::broker::auto_leader_rebalance_enable', true), + $log_retention_hours = hiera('profile::kafka::broker::log_retention_hours', 168), + $num_recovery_threads_per_data_dir = hiera('profile::kafka::broker::num_recovery_threads_per_data_dir', undef), + $num_io_threads = hiera('profile::kafka::broker::num_io_threads', 1), + $num_replica_fetchers = hiera('profile::kafka::broker::num_replica_fetchers', undef), + $nofiles_ulimit = hiera('profile::kafka::broker::nofiles_ulimit', 8192), # This is set via top level hiera variable so it can be synchronized between roles and clients. - $message_max_bytes = hiera('kafka_message_max_bytes'), - $auth_acls_enabled = hiera('profile::kafka::broker::auth_acls_enabled'), - $monitoring_enabled = hiera('profile::kafka::broker::monitoring_enabled'), + $message_max_bytes = hiera('kafka_message_max_bytes', 1048576), + $auth_acls_enabled = hiera('profile::kafka::broker::auth_acls_enabled', false), + $monitoring_enabled = hiera('profile::kafka::broker::monitoring_enabled', false), ) { # TODO: WIP $tls_secrets_path = undef @@ -112,47 +156,71 @@ $plaintext_port = 9092 $plaintext_listener = "PLAINTEXT://:${plaintext_port}" - $tls_port = 9093 - $tls_listener = "SSL://:${tls_port}" + $ssl_port = 9093 + $ssl_listener = "SSL://:${ssl_port}" # Conditionally set $listeners and $ssl_client_auth # based on values of $tls and $plaintext. - if $tls_secrets_path and $plaintext { - $listeners = [$plaintext_listener, $tls_listener] + if $ssl_enabled and $plaintext { + $listeners = [$plaintext_listener, $ssl_listener] $ssl_client_auth = 'requested' } elsif $plaintext { $listeners = [$plaintext_listener] $ssl_client_auth = 'none' } - elsif $tls_secrets_path { - $listeners = [$tls_listener] + elsif $ssl_enabled { + $listeners = [$ssl_listener] $ssl_client_auth = 'required' } else { - fatal('Must set at least one of $plaintext or $ssl to true.') + fatal('Must set at least one of $plaintext or $ssl_enabled to true.') } - # TODO: - # WIP https://gerrit.wikimedia.org/r/#/c/359960/ - # https://gerrit.wikimedia.org/r/#/c/355796/ - if $tls_secrets_path { - # Distribute the Java keystores for this broker's certificate. - # These need to have been generated with ca-manager and - # checked into the Puppet private repo in - # modules/secret/secrets/$tls_secrets_path/$fqdn/. - # TODO - # ::ca::certs { "${tls_secrets_path}/${::fqdn}": - # destination => '/etc/kafka/tls', - # owner => 'kafka', - # } + if $ssl_enabled { + # Distribute Java keystore and truststore for this broker. + $certificates_path = '/etc/kafka/certificates' + $ssl_keystore_location = "/etc/kafka/certificates/kafka_broker_${::hostname}.keystore.jks" + + file { $certificates_path: + ensure => 'directory', + owner => 'kafka', + group => 'kafka', + mode => '0555', + } + file { $ssl_keystore_location: + source => $ssl_keystore_source, + owner => 'kafka', + group => 'kafka', + mode => '0440', + # Install certificates after confluent-kafka package has been + # installed and /etc/kafka already exists... + require => Class['::confluent::kafka::common'], + # But before Kafka broker is started. + before => Class['::confluent::kafka::broker'], + } $security_inter_broker_protocol = 'SSL' - $ssl_keystore_location = "/etc/kafka/tls/${::fqdn}.jks" - $ssl_keystore_password = $tls_key_password - $ssl_key_password = $tls_key_password - $ssl_truststore_location = '/etc/kafka/tls/truststore.jks' - $ssl_truststore_password = $tls_key_password + $ssl_keystore_password = $ssl_password + $ssl_key_password = $ssl_password + + if $ssl_truststore_source { + $ssl_truststore_location = "${certificates_path}/truststore.jks" + $ssl_truststore_password = $ssl_password + + file { $ssl_truststore_location: + source => $ssl_truststore_source, + owner => 'kafka', + group => 'kafka', + mode => '0444', + require => Class['::confluent::kafka::common'], + before => Class['::confluent::kafka::broker'], + } + } + else { + $ssl_truststore_location = undef + $ssl_truststore_password = undef + } } else { $security_inter_broker_protocol = undef @@ -204,7 +272,6 @@ nofiles_ulimit => $nofiles_ulimit, default_replication_factor => min(3, $config['brokers']['size']), offsets_topic_replication_factor => min(3, $config['brokers']['size']), - delete_topic_enable => true, # TODO: This can be removed once it is a default # in ::confluent::kafka module inter_broker_protocol_version => '0.11.0', @@ -243,16 +310,16 @@ srange => '($PRODUCTION_NETWORKS $FRACK_NETWORKS)', } - $ferm_tls_ensure = $tls_secrets_path ? { + $ferm_ssl_ensure = $ssl_enabled ? { false => 'absent', undef => 'absent', default => 'present' } - # Firewall for Kafka broker on $tls_port - ferm::service { 'kafka-broker-tls': - ensure => $ferm_tls_ensure, + # Firewall for Kafka broker on $ssl_port + ferm::service { 'kafka-broker-ssl': + ensure => $ferm_ssl_ensure, proto => 'tcp', - port => $tls_port, + port => $ssl_port, notrack => true, srange => '($PRODUCTION_NETWORKS $FRACK_NETWORKS)', } -- To view, visit https://gerrit.wikimedia.org/r/394144 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id8a7240e44b492801fd1834028a9eff52334d8f9 Gerrit-PatchSet: 1 Gerrit-Project: operations/puppet Gerrit-Branch: production Gerrit-Owner: Ottomata <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
