Ottomata has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/394144 )

Change subject: Puppetize SSL for Kafka broker
......................................................................

Puppetize SSL for Kafka broker

Bug: T166167
Bug: T121561
Change-Id: Id8a7240e44b492801fd1834028a9eff52334d8f9
---
M modules/profile/manifests/kafka/broker.pp
1 file changed, 129 insertions(+), 62 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/44/394144/1

diff --git a/modules/profile/manifests/kafka/broker.pp 
b/modules/profile/manifests/kafka/broker.pp
index 55e1196..5f7c1d9 100644
--- a/modules/profile/manifests/kafka/broker.pp
+++ b/modules/profile/manifests/kafka/broker.pp
@@ -3,7 +3,30 @@
 # cluster.  $kafka_cluster_name must have an entry in the hiera 
'kafka_clusters'
 # variable, and $::fqdn must be listed as a broker there.
 #
+# == SSL Configuration
+#
+# To configure SSL for Kafka brokers, you need the following files 
distributable by Puppet:
+#
+# - A keystore.jks file   - Contains the key and certificate for this broker
+# - A truststore.jks file - Contains the CA certificate that signed the 
broker's certificate
+#
+# It is expected that the CA certificate in the truststore will also be used 
to sign
+# all Kafka client certificates.
+#
+# Once these are somewhere in puppet (probably the private repository), you 
should set
+# $ssl_enabled to true, $ssl_keystore_source and $ssl_truststore_source to 
their
+# puppet:// source paths, and also set $ssl_password to the password used to 
encrypt
+# the key, keystore, and truststores.
+#
+# The default values of $ssl_keystore_source and $ssl_truststore_source expect 
that you
+# used cergen with the puppet_ca to generate and commit files in the private 
puppet
+# repository in the secret module at modules/secret/files/cergen.  If you did, 
then you
+# need only set $ssl_enabled and $ssl_password.
+#
+# See https://wikitech.wikimedia.org/wiki/Cergen for more details.
+#
 # == Parameters
+#
 # [*kafka_cluster_name*]
 #   Kafka cluster name.  This should be the non DC/project suffixed cluster 
name,
 #   e.g. main, aggregate, simple, etc.  The kafka_cluster_name puppet parser
@@ -15,75 +38,96 @@
 #
 # [*plaintext*]
 #   Boolean whether to use a plaintext listener on port 9092.
-#   Hieara: profile::kafka::broker::plaintext
+#   Hiera: profile::kafka::broker::plaintext.  Default true.
 #
-# [*tls_secrets_path*]
-#   TODO: this does not yet work.
-#   Relative base path to tls secrets files in operations puppet private 
repository.
-#   If set, TLS/SSL will be configured for Kafka.  Each broker needs to have
-#   a ca-manager created directory for the node's $::fqdn in this path.
-#   E.g. if this is set to 'kafka/common', and the current node is 
kafka1101.eqiad.wmnet,
-#   then modules/secret/secrets/kafka/common/kafka1101.eqiad.wmnet/ should 
exist
-#   and should contain keyfiles and keystores created by ca-manager.
-#   Hiera: 'profile::kafka::broker::tls_secrets_path'
+# [*ssl_enabled]
+#   If true, an SSL listener will be configured.  Default: false
 #
-# [*tls_key_password*]
-#   Password for keystores and keys in tls_secrets_path.  You should
+# [*ssl_keystore_source]
+#   THe puppet source path to the Java keystore containing this broker's
+#   certificate and key.
+#   Default: 
puppet:///modules/secret/cergen/kafka_broker_${::hostname}/kafka_broker_${::hostname}.keystore.jks
+#   This default assumes that you have a keystore.jks file created with cergen 
in the puppet
+#   private repository.
+#
+# [*ssl_truststore_source*]
+#   If given, Kafka SSL will be configured with a truststore, which means that
+#   client certificates must be signed by the CA cert inside the truststore.
+#   Default: 
puppet:///modules/secret/cergen/kafka_broker_${::hostname}/puppet_ca.truststore.jks
+#   This default assumes that you have a truststore.jks file created with 
cergen in the puppet
+#   private repository.
+#
+# [*ssl_password*]
+#   Password for keystores and keys.  You should
 #   set this in hiera in the operations puppet private repository.
-#   Hiera: profile::kafka::broker::tls_key_password
+#   Hiera: profile::kafka::broker::ssl_password  This expects
+#   that all keystore, truststores, and keys use the same password.
+#   Default undef
 #
 # [*log_dirs*]
 #   Array of Kafka log data directories.  The confluent::kafka::broker class
 #   manages these directories but not anything above them.  Unless the prefix
 #   is /srv/kafka, then this profile tries to be nice.  Otherwise,
 #   you must ensure that any parent directories exist outside of this class.
-#   Hiera: profile::kafka::broker::log_dirs
+#   Hiera: profile::kafka::broker::log_dirs  Default: ['/srv/kafka']
 #
 # [*auto_leader_rebalance_enable*]
 #   Hiera: profile::kafka::broker::auto_leader_rebalance_enable
+#   Default: true
 #
 # [*log_retention_hours*]
-#   Hiera: profile::kafka::broker::log_retention_hours
+#   Hiera: profile::kafka::broker::log_retention_hours  Default: 168 (1 week)
+#
+# [*num_recovery_threads_per_data_dir*]
+#   Hiera: profile::kafka::broker::num_recovery_threads_per_data_dir  Default 
undef
+#
+# [*num_io_threads*]
+#   Hiera: profile::kafka::broker::num_replica_fetchers  Default 1
 #
 # [*num_replica_fetchers*]
-#   Hiera: profile::kafka::broker::num_replica_fetchers
+#   Hiera: profile::kafka::broker::num_replica_fetchers  Default undef
 #
 # [*nofiles_ulimit*]
 #   Hiera: profile::kafka::broker::nofiles_ulimit
+#   Default: 8192
 #
 # [*message_max_bytes*]
 #   The largest record batch size allowed by Kafka.
 #   If this is increased and there are consumers older
 #   than 0.10.2, the consumers' fetch size must also be increased
 #   so that the they can fetch record batches this large.
+#   Default: 1048576
 #
 # [*auth_acls_enabled*]
 #   Enables the kafka.security.auth.SimpleAclAuthorizer bundled with Kafka.
 #   This will also increase the verbosity of authorization logs for a better
-#   user accounting.
+#   user accounting.  Default: false
 #
 # [*monitoring_enabled*]
-#   Enable monitoring and alerts for this broker.
+#   Enable monitoring and alerts for this broker.  Default: false
 #
 class profile::kafka::broker(
     $kafka_cluster_name                = 
hiera('profile::kafka::broker::kafka_cluster_name'),
     $statsd                            = hiera('statsd'),
 
-    $plaintext                         = 
hiera('profile::kafka::broker::plaintext'),
-    # $tls_secrets_path                = 
hiera('profile::kafka::broker::tls_secrets_path'),
-    # $tls_key_password                = 
hiera('profile::kafka::broker::tls_key_password'),
+    $plaintext                         = 
hiera('profile::kafka::broker::plaintext', true),
 
-    $log_dirs                          = 
hiera('profile::kafka::broker::log_dirs'),
-    $auto_leader_rebalance_enable      = 
hiera('profile::kafka::broker::auto_leader_rebalance_enable'),
-    $log_retention_hours               = 
hiera('profile::kafka::broker::log_retention_hours'),
-    $num_recovery_threads_per_data_dir = 
hiera('profile::kafka::broker::num_recovery_threads_per_data_dir'),
-    $num_io_threads                    = 
hiera('profile::kafka::broker::num_io_threads'),
-    $num_replica_fetchers              = 
hiera('profile::kafka::broker::num_replica_fetchers'),
-    $nofiles_ulimit                    = 
hiera('profile::kafka::broker::nofiles_ulimit'),
+    $ssl_enabled                       = 
hiera('profile::kafka::broker::ssl_enabled', false),
+    $ssl_keystore_source               = 
hiera('profile::kafka::broker::ssl_keystore_source', 
"puppet:///modules/secret/cergen/kafka_broker_${::hostname}/kafka_broker_${::hostname}.keystore.jks"),
+    $ssl_truststore_source             = 
hiera('profile::kafka::broker::ssl_keystore_source', 
"puppet:///modules/secret/cergen/kafka_broker_${::hostname}/puppet_ca.truststore.jks"),
+    $ssl_password                      = 
hiera('profile::kafka::broker::ssl_password', undef),
+
+    $log_dirs                          = 
hiera('profile::kafka::broker::log_dirs', ['/srv/kafka']),
+    $auto_leader_rebalance_enable      = 
hiera('profile::kafka::broker::auto_leader_rebalance_enable', true),
+    $log_retention_hours               = 
hiera('profile::kafka::broker::log_retention_hours', 168),
+    $num_recovery_threads_per_data_dir = 
hiera('profile::kafka::broker::num_recovery_threads_per_data_dir', undef),
+    $num_io_threads                    = 
hiera('profile::kafka::broker::num_io_threads', 1),
+    $num_replica_fetchers              = 
hiera('profile::kafka::broker::num_replica_fetchers', undef),
+    $nofiles_ulimit                    = 
hiera('profile::kafka::broker::nofiles_ulimit', 8192),
     # This is set via top level hiera variable so it can be synchronized 
between roles and clients.
-    $message_max_bytes                 = hiera('kafka_message_max_bytes'),
-    $auth_acls_enabled                 = 
hiera('profile::kafka::broker::auth_acls_enabled'),
-    $monitoring_enabled                = 
hiera('profile::kafka::broker::monitoring_enabled'),
+    $message_max_bytes                 = hiera('kafka_message_max_bytes', 
1048576),
+    $auth_acls_enabled                 = 
hiera('profile::kafka::broker::auth_acls_enabled', false),
+    $monitoring_enabled                = 
hiera('profile::kafka::broker::monitoring_enabled', false),
 ) {
     # TODO: WIP
     $tls_secrets_path = undef
@@ -112,47 +156,71 @@
 
     $plaintext_port     = 9092
     $plaintext_listener = "PLAINTEXT://:${plaintext_port}"
-    $tls_port           = 9093
-    $tls_listener       = "SSL://:${tls_port}"
+    $ssl_port           = 9093
+    $ssl_listener       = "SSL://:${ssl_port}"
 
     # Conditionally set $listeners and $ssl_client_auth
     # based on values of $tls and $plaintext.
-    if $tls_secrets_path and $plaintext {
-        $listeners = [$plaintext_listener, $tls_listener]
+    if $ssl_enabled and $plaintext {
+        $listeners = [$plaintext_listener, $ssl_listener]
         $ssl_client_auth       = 'requested'
     }
     elsif $plaintext {
         $listeners             = [$plaintext_listener]
         $ssl_client_auth       = 'none'
     }
-    elsif $tls_secrets_path {
-        $listeners             = [$tls_listener]
+    elsif $ssl_enabled {
+        $listeners             = [$ssl_listener]
         $ssl_client_auth       = 'required'
     }
     else {
-        fatal('Must set at least one of $plaintext or $ssl to true.')
+        fatal('Must set at least one of $plaintext or $ssl_enabled to true.')
     }
 
-    # TODO:
-    #   WIP https://gerrit.wikimedia.org/r/#/c/359960/
-    #       https://gerrit.wikimedia.org/r/#/c/355796/
-    if $tls_secrets_path {
-        # Distribute the Java keystores for this broker's certificate.
-        # These need to have been generated with ca-manager and
-        # checked into the Puppet private repo in
-        # modules/secret/secrets/$tls_secrets_path/$fqdn/.
-        # TODO
-        # ::ca::certs { "${tls_secrets_path}/${::fqdn}":
-        #     destination => '/etc/kafka/tls',
-        #     owner       => 'kafka',
-        # }
+    if $ssl_enabled {
+        # Distribute Java keystore and truststore for this broker.
+        $certificates_path = '/etc/kafka/certificates'
+        $ssl_keystore_location          = 
"/etc/kafka/certificates/kafka_broker_${::hostname}.keystore.jks"
+
+        file { $certificates_path:
+            ensure => 'directory',
+            owner  => 'kafka',
+            group  => 'kafka',
+            mode   => '0555',
+        }
+        file { $ssl_keystore_location:
+            source => $ssl_keystore_source,
+            owner  => 'kafka',
+            group  => 'kafka',
+            mode   => '0440',
+            # Install certificates after confluent-kafka package has been
+            # installed and /etc/kafka already exists...
+            require => Class['::confluent::kafka::common'],
+            # But before Kafka broker is started.
+            before  => Class['::confluent::kafka::broker'],
+        }
 
         $security_inter_broker_protocol = 'SSL'
-        $ssl_keystore_location          = "/etc/kafka/tls/${::fqdn}.jks"
-        $ssl_keystore_password          = $tls_key_password
-        $ssl_key_password               = $tls_key_password
-        $ssl_truststore_location        = '/etc/kafka/tls/truststore.jks'
-        $ssl_truststore_password        = $tls_key_password
+        $ssl_keystore_password          = $ssl_password
+        $ssl_key_password               = $ssl_password
+
+        if $ssl_truststore_source {
+            $ssl_truststore_location = "${certificates_path}/truststore.jks"
+            $ssl_truststore_password = $ssl_password
+
+            file { $ssl_truststore_location:
+                source => $ssl_truststore_source,
+                owner  => 'kafka',
+                group  => 'kafka',
+                mode   => '0444',
+                require => Class['::confluent::kafka::common'],
+                before  => Class['::confluent::kafka::broker'],
+            }
+        }
+        else {
+            $ssl_truststore_location = undef
+            $ssl_truststore_password = undef
+        }
     }
     else {
         $security_inter_broker_protocol = undef
@@ -204,7 +272,6 @@
         nofiles_ulimit                   => $nofiles_ulimit,
         default_replication_factor       => min(3, $config['brokers']['size']),
         offsets_topic_replication_factor => min(3,  
$config['brokers']['size']),
-        delete_topic_enable              => true,
         # TODO: This can be removed once it is a default
         # in ::confluent::kafka module
         inter_broker_protocol_version    => '0.11.0',
@@ -243,16 +310,16 @@
         srange  => '($PRODUCTION_NETWORKS $FRACK_NETWORKS)',
     }
 
-    $ferm_tls_ensure = $tls_secrets_path ? {
+    $ferm_ssl_ensure = $ssl_enabled ? {
         false   => 'absent',
         undef   => 'absent',
         default => 'present'
     }
-    # Firewall for Kafka broker on $tls_port
-    ferm::service { 'kafka-broker-tls':
-        ensure  => $ferm_tls_ensure,
+    # Firewall for Kafka broker on $ssl_port
+    ferm::service { 'kafka-broker-ssl':
+        ensure  => $ferm_ssl_ensure,
         proto   => 'tcp',
-        port    => $tls_port,
+        port    => $ssl_port,
         notrack => true,
         srange  => '($PRODUCTION_NETWORKS $FRACK_NETWORKS)',
     }

-- 
To view, visit https://gerrit.wikimedia.org/r/394144
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id8a7240e44b492801fd1834028a9eff52334d8f9
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to