Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/394144 )

Change subject: Puppetize SSL for Kafka broker
......................................................................


Puppetize SSL for Kafka broker

Bug: T166167
Bug: T121561
Change-Id: Id8a7240e44b492801fd1834028a9eff52334d8f9
---
M hieradata/role/common/kafka/jumbo/broker.yaml
M modules/confluent/manifests/kafka/broker.pp
M modules/profile/manifests/kafka/broker.pp
3 files changed, 129 insertions(+), 79 deletions(-)

Approvals:
  Ottomata: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/hieradata/role/common/kafka/jumbo/broker.yaml 
b/hieradata/role/common/kafka/jumbo/broker.yaml
index 668a545..0a8111c 100644
--- a/hieradata/role/common/kafka/jumbo/broker.yaml
+++ b/hieradata/role/common/kafka/jumbo/broker.yaml
@@ -6,21 +6,18 @@
 # More info https://phabricator.wikimedia.org/T167304#3478277
 profile::kafka::broker::auth_acls_enabled: true
 
+# Enable SSL/TLS for Kafka brokers.  This requires
+# that keystore and truststore files, and
+# profile::kafka::broker::ssl_password, are committed in
+# the expected location in ops/puppet/private.
+profile::kafka::broker::ssl_enabled: true
+
 # Enable Monitoring (via Prometheus) and icinga alerts
 profile::kafka::broker::monitoring_enabled: true
 
 profile::kafka::broker::log_dirs: [/srv/kafka/data]
 profile::kafka::broker::plaintext: true
-# TODO: does not yet work.
-# # ca-manager will use a self signed CA cert to authenticate clients.
-# # Some clients (e.g. MirrorMaker) will need to talk to multiple Kafka
-# # clusters.  So, we need all production Kafka clusters to use the same root 
CA, which
-# # means that ca-manager must have all clusters that might need to 
authenticate
-# # a client to be configured in the same ca-manager manifest config file.
-# # All certs will be in a single ca-manager base directory.  This should
-# # match what is in the operations puppet private repository in the secrets 
module.
-# profile::kafka::broker::tls_secrets_path: kafka/common
-# profile::kafka::broker::tls_key_password: qwerty # TODO: put this in puppet 
private
+
 profile::kafka::broker::auto_leader_rebalance_enable: true
 profile::kafka::broker::nofiles_ulimit: 65536
 
diff --git a/modules/confluent/manifests/kafka/broker.pp 
b/modules/confluent/manifests/kafka/broker.pp
index 9d24c41..b543888 100644
--- a/modules/confluent/manifests/kafka/broker.pp
+++ b/modules/confluent/manifests/kafka/broker.pp
@@ -317,9 +317,17 @@
         mode   => '0755',
     }
 
+    # If any passwords, set proper readability.
+    if $ssl_key_password or $ssl_keystore_password or $ssl_truststore_password 
{
+        $server_properties_mode = '0440'
+    }
+    else {
+        $server_properties_mode = '0444'
+    }
     # Render out Kafka Broker config files.
     file { '/etc/kafka/server.properties':
         content => template($server_properties_template),
+        mode    => $server_properties_mode,
     }
 
     # log4j configuration for Kafka daemon
diff --git a/modules/profile/manifests/kafka/broker.pp 
b/modules/profile/manifests/kafka/broker.pp
index 55e1196..9c5a188 100644
--- a/modules/profile/manifests/kafka/broker.pp
+++ b/modules/profile/manifests/kafka/broker.pp
@@ -3,7 +3,32 @@
 # cluster.  $kafka_cluster_name must have an entry in the hiera 
'kafka_clusters'
 # variable, and $::fqdn must be listed as a broker there.
 #
+# == SSL Configuration
+#
+# To configure SSL for Kafka brokers, you need the following files 
distributable by our Puppet
+# secret() function.
+#
+# - A keystore.jks file   - Contains the key and certificate for this broker
+# - A truststore.jks file - Contains the CA certificate that signed the 
broker's certificate
+#
+# It is expected that the CA certificate in the truststore will also be used 
to sign
+# all Kafka client certificates.  These should be checked into the Puppet 
private repository's
+# secret module at
+#
+#   - 
secrets/certificates/kafka_broker_${::hostname}/kafka_broker_$hostname.keystore.jks
+#   - secrets/certificates/kafka_broker_${::hostname}/truststore.jks
+#
+# This layout is built to work with certificates generated using cergen like
+#    cergen --base-path /srv/private/modules/secret/secrets/certificates ...
+#
+# Once these are in the Puppet private repository's secret module, set
+# $ssl_enabled to true and  $ssl_password to the password
+# used when genrating the key, keystore, and truststore.
+#
+# See https://wikitech.wikimedia.org/wiki/Cergen for more details.
+#
 # == Parameters
+#
 # [*kafka_cluster_name*]
 #   Kafka cluster name.  This should be the non DC/project suffixed cluster 
name,
 #   e.g. main, aggregate, simple, etc.  The kafka_cluster_name puppet parser
@@ -15,75 +40,80 @@
 #
 # [*plaintext*]
 #   Boolean whether to use a plaintext listener on port 9092.
-#   Hieara: profile::kafka::broker::plaintext
+#   Hiera: profile::kafka::broker::plaintext.  Default true.
 #
-# [*tls_secrets_path*]
-#   TODO: this does not yet work.
-#   Relative base path to tls secrets files in operations puppet private 
repository.
-#   If set, TLS/SSL will be configured for Kafka.  Each broker needs to have
-#   a ca-manager created directory for the node's $::fqdn in this path.
-#   E.g. if this is set to 'kafka/common', and the current node is 
kafka1101.eqiad.wmnet,
-#   then modules/secret/secrets/kafka/common/kafka1101.eqiad.wmnet/ should 
exist
-#   and should contain keyfiles and keystores created by ca-manager.
-#   Hiera: 'profile::kafka::broker::tls_secrets_path'
+# [*ssl_enabled*]
+#   If true, an SSL listener will be configured.  Default: false
 #
-# [*tls_key_password*]
-#   Password for keystores and keys in tls_secrets_path.  You should
+# [*ssl_password*]
+#   Password for keystores and keys.  You should
 #   set this in hiera in the operations puppet private repository.
-#   Hiera: profile::kafka::broker::tls_key_password
+#   Hiera: profile::kafka::broker::ssl_password  This expects
+#   that all keystore, truststores, and keys use the same password.
+#   Default: undef
 #
 # [*log_dirs*]
 #   Array of Kafka log data directories.  The confluent::kafka::broker class
 #   manages these directories but not anything above them.  Unless the prefix
 #   is /srv/kafka, then this profile tries to be nice.  Otherwise,
 #   you must ensure that any parent directories exist outside of this class.
-#   Hiera: profile::kafka::broker::log_dirs
+#   Hiera: profile::kafka::broker::log_dirs  Default: ['/srv/kafka']
 #
 # [*auto_leader_rebalance_enable*]
 #   Hiera: profile::kafka::broker::auto_leader_rebalance_enable
+#   Default: true
 #
 # [*log_retention_hours*]
-#   Hiera: profile::kafka::broker::log_retention_hours
+#   Hiera: profile::kafka::broker::log_retention_hours  Default: 168 (1 week)
+#
+# [*num_recovery_threads_per_data_dir*]
+#   Hiera: profile::kafka::broker::num_recovery_threads_per_data_dir  Default 
undef
+#
+# [*num_io_threads*]
+#   Hiera: profile::kafka::broker::num_replica_fetchers  Default 1
 #
 # [*num_replica_fetchers*]
-#   Hiera: profile::kafka::broker::num_replica_fetchers
+#   Hiera: profile::kafka::broker::num_replica_fetchers  Default undef
 #
 # [*nofiles_ulimit*]
 #   Hiera: profile::kafka::broker::nofiles_ulimit
+#   Default: 8192
 #
 # [*message_max_bytes*]
 #   The largest record batch size allowed by Kafka.
 #   If this is increased and there are consumers older
 #   than 0.10.2, the consumers' fetch size must also be increased
 #   so that the they can fetch record batches this large.
+#   Default: 1048576
 #
 # [*auth_acls_enabled*]
 #   Enables the kafka.security.auth.SimpleAclAuthorizer bundled with Kafka.
 #   This will also increase the verbosity of authorization logs for a better
-#   user accounting.
+#   user accounting.  Default: false
 #
 # [*monitoring_enabled*]
-#   Enable monitoring and alerts for this broker.
+#   Enable monitoring and alerts for this broker.  Default: false
 #
 class profile::kafka::broker(
     $kafka_cluster_name                = 
hiera('profile::kafka::broker::kafka_cluster_name'),
     $statsd                            = hiera('statsd'),
 
-    $plaintext                         = 
hiera('profile::kafka::broker::plaintext'),
-    # $tls_secrets_path                = 
hiera('profile::kafka::broker::tls_secrets_path'),
-    # $tls_key_password                = 
hiera('profile::kafka::broker::tls_key_password'),
+    $plaintext                         = 
hiera('profile::kafka::broker::plaintext', true),
 
-    $log_dirs                          = 
hiera('profile::kafka::broker::log_dirs'),
-    $auto_leader_rebalance_enable      = 
hiera('profile::kafka::broker::auto_leader_rebalance_enable'),
-    $log_retention_hours               = 
hiera('profile::kafka::broker::log_retention_hours'),
-    $num_recovery_threads_per_data_dir = 
hiera('profile::kafka::broker::num_recovery_threads_per_data_dir'),
-    $num_io_threads                    = 
hiera('profile::kafka::broker::num_io_threads'),
-    $num_replica_fetchers              = 
hiera('profile::kafka::broker::num_replica_fetchers'),
-    $nofiles_ulimit                    = 
hiera('profile::kafka::broker::nofiles_ulimit'),
+    $ssl_enabled                       = 
hiera('profile::kafka::broker::ssl_enabled', false),
+    $ssl_password                      = 
hiera('profile::kafka::broker::ssl_password', undef),
+
+    $log_dirs                          = 
hiera('profile::kafka::broker::log_dirs', ['/srv/kafka']),
+    $auto_leader_rebalance_enable      = 
hiera('profile::kafka::broker::auto_leader_rebalance_enable', true),
+    $log_retention_hours               = 
hiera('profile::kafka::broker::log_retention_hours', 168),
+    $num_recovery_threads_per_data_dir = 
hiera('profile::kafka::broker::num_recovery_threads_per_data_dir', undef),
+    $num_io_threads                    = 
hiera('profile::kafka::broker::num_io_threads', 1),
+    $num_replica_fetchers              = 
hiera('profile::kafka::broker::num_replica_fetchers', undef),
+    $nofiles_ulimit                    = 
hiera('profile::kafka::broker::nofiles_ulimit', 8192),
     # This is set via top level hiera variable so it can be synchronized 
between roles and clients.
-    $message_max_bytes                 = hiera('kafka_message_max_bytes'),
-    $auth_acls_enabled                 = 
hiera('profile::kafka::broker::auth_acls_enabled'),
-    $monitoring_enabled                = 
hiera('profile::kafka::broker::monitoring_enabled'),
+    $message_max_bytes                 = hiera('kafka_message_max_bytes', 
1048576),
+    $auth_acls_enabled                 = 
hiera('profile::kafka::broker::auth_acls_enabled', false),
+    $monitoring_enabled                = 
hiera('profile::kafka::broker::monitoring_enabled', false),
 ) {
     # TODO: WIP
     $tls_secrets_path = undef
@@ -112,55 +142,71 @@
 
     $plaintext_port     = 9092
     $plaintext_listener = "PLAINTEXT://:${plaintext_port}"
-    $tls_port           = 9093
-    $tls_listener       = "SSL://:${tls_port}"
+    $ssl_port           = 9093
+    $ssl_listener       = "SSL://:${ssl_port}"
 
     # Conditionally set $listeners and $ssl_client_auth
     # based on values of $tls and $plaintext.
-    if $tls_secrets_path and $plaintext {
-        $listeners = [$plaintext_listener, $tls_listener]
+    if $ssl_enabled and $plaintext {
+        $listeners = [$plaintext_listener, $ssl_listener]
         $ssl_client_auth       = 'requested'
     }
     elsif $plaintext {
         $listeners             = [$plaintext_listener]
         $ssl_client_auth       = 'none'
     }
-    elsif $tls_secrets_path {
-        $listeners             = [$tls_listener]
+    elsif $ssl_enabled {
+        $listeners             = [$ssl_listener]
         $ssl_client_auth       = 'required'
     }
     else {
-        fatal('Must set at least one of $plaintext or $ssl to true.')
+        fatal('Must set at least one of $plaintext or $ssl_enabled to true.')
     }
 
-    # TODO:
-    #   WIP https://gerrit.wikimedia.org/r/#/c/359960/
-    #       https://gerrit.wikimedia.org/r/#/c/355796/
-    if $tls_secrets_path {
-        # Distribute the Java keystores for this broker's certificate.
-        # These need to have been generated with ca-manager and
-        # checked into the Puppet private repo in
-        # modules/secret/secrets/$tls_secrets_path/$fqdn/.
-        # TODO
-        # ::ca::certs { "${tls_secrets_path}/${::fqdn}":
-        #     destination => '/etc/kafka/tls',
-        #     owner       => 'kafka',
-        # }
-
+    if $ssl_enabled {
+        # Use SSL for inter broker communication.
         $security_inter_broker_protocol = 'SSL'
-        $ssl_keystore_location          = "/etc/kafka/tls/${::fqdn}.jks"
-        $ssl_keystore_password          = $tls_key_password
-        $ssl_key_password               = $tls_key_password
-        $ssl_truststore_location        = '/etc/kafka/tls/truststore.jks'
-        $ssl_truststore_password        = $tls_key_password
+
+        # Distribute Java keystore and truststore for this broker.
+        $ssl_location                   = '/etc/kafka/ssl'
+
+        $ssl_keystore_secrets_path      = 
"certificates/kafka_broker_${::hostname}/kafka_broker_${::hostname}.keystore.jks"
+        $ssl_keystore_location          = 
"${ssl_location}/kafka_broker_${::hostname}.keystore.jks"
+
+        $ssl_truststore_secrets_path    = 
"certificates/kafka_broker_${::hostname}/truststore.jks"
+        $ssl_truststore_location        = "${ssl_location}/truststore.jks"
+
+        file { $ssl_location:
+            ensure => 'directory',
+            owner  => 'kafka',
+            group  => 'kafka',
+            mode   => '0555',
+        }
+        file { $ssl_keystore_location:
+            content => secret($ssl_keystore_secrets_path),
+            owner   => 'kafka',
+            group   => 'kafka',
+            mode    => '0440',
+            # Install certificates after confluent-kafka package has been
+            # installed and /etc/kafka already exists...
+            require => Class['::confluent::kafka::common'],
+            # But before Kafka broker is started.
+            before  => Class['::confluent::kafka::broker'],
+        }
+
+        file { $ssl_truststore_location:
+            content => secret($ssl_truststore_secrets_path),
+            owner   => 'kafka',
+            group   => 'kafka',
+            mode    => '0444',
+            require => Class['::confluent::kafka::common'],
+            before  => Class['::confluent::kafka::broker'],
+        }
     }
     else {
         $security_inter_broker_protocol = undef
         $ssl_keystore_location          = undef
-        $ssl_keystore_password          = undef
-        $ssl_key_password               = undef
         $ssl_truststore_location        = undef
-        $ssl_truststore_password        = undef
     }
 
     # Be nice, and manage /srv/kafka if it is the prefix for kafka data 
directories.
@@ -204,7 +250,6 @@
         nofiles_ulimit                   => $nofiles_ulimit,
         default_replication_factor       => min(3, $config['brokers']['size']),
         offsets_topic_replication_factor => min(3,  
$config['brokers']['size']),
-        delete_topic_enable              => true,
         # TODO: This can be removed once it is a default
         # in ::confluent::kafka module
         inter_broker_protocol_version    => '0.11.0',
@@ -217,10 +262,10 @@
 
         security_inter_broker_protocol   => $security_inter_broker_protocol,
         ssl_keystore_location            => $ssl_keystore_location,
-        ssl_keystore_password            => $ssl_keystore_password,
-        ssl_key_password                 => $ssl_key_password,
+        ssl_keystore_password            => $ssl_password,
+        ssl_key_password                 => $ssl_password,
         ssl_truststore_location          => $ssl_truststore_location,
-        ssl_truststore_password          => $ssl_truststore_password,
+        ssl_truststore_password          => $ssl_password,
         ssl_client_auth                  => $ssl_client_auth,
 
         auto_leader_rebalance_enable     => $auto_leader_rebalance_enable,
@@ -243,16 +288,16 @@
         srange  => '($PRODUCTION_NETWORKS $FRACK_NETWORKS)',
     }
 
-    $ferm_tls_ensure = $tls_secrets_path ? {
+    $ferm_ssl_ensure = $ssl_enabled ? {
         false   => 'absent',
         undef   => 'absent',
         default => 'present'
     }
-    # Firewall for Kafka broker on $tls_port
-    ferm::service { 'kafka-broker-tls':
-        ensure  => $ferm_tls_ensure,
+    # Firewall for Kafka broker on $ssl_port
+    ferm::service { 'kafka-broker-ssl':
+        ensure  => $ferm_ssl_ensure,
         proto   => 'tcp',
-        port    => $tls_port,
+        port    => $ssl_port,
         notrack => true,
         srange  => '($PRODUCTION_NETWORKS $FRACK_NETWORKS)',
     }

-- 
To view, visit https://gerrit.wikimedia.org/r/394144
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Id8a7240e44b492801fd1834028a9eff52334d8f9
Gerrit-PatchSet: 9
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Elukey <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to