Ottomata has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/391705 )

Change subject: Move statsv varnishkafka and service to use main Kafka 
cluster(s)
......................................................................


Move statsv varnishkafka and service to use main Kafka cluster(s)

This gives statsv active/passive multi-DC support.

After much IRC discussion, this allows for an active/passive statsv
backed by the main Kafka clusters in eqiad and codfw.  varnishkafkas
in all DCs produce statsv messages to the Kafka cluster name specified
by profile::cache::kafka::statsv::kafka_cluster_name, which in production
is set to 'main-eqiad'.

statsv consumer instances will run in eqiad and codfw and consume
from their local main Kafka clusters.  Since statsd is active/passive
those statsv consumer instances will produce to the same active statsd
instance, independent of which datacenter they run in.  I.e.
if statsd is active in eqiad, both statsv in eqiad (consuming
from main-eqiad) and statsv in codfw (consuming from main-codfw)
will produce to statsd in eqiad.

However, since all statsv varnishkafkas produce to the same
Kafka cluster in an 'active' DC, only one statsv instance
will have any messages to consume at any given time.

If you plan to move the active statsd instance away from
main-eqiad for an extended (permanent?) period of time, you should
also change the value of profile::cache::kafka::statsv::kafka_cluster_name.

Or, if you need to do maintenance on statsv for an extended period of time,
you could route all varnishkafka produced statsv messages to e.g. main-codfw,
and shut down the eqiad statsv consumers, and still get statsv messages
in statsd.

Bug: T179093
Change-Id: I6c566c19fcdab004eec21384e6a5c136b3cf699c
---
M hieradata/role/common/cache/text.yaml
A modules/profile/manifests/cache/kafka/statsv.pp
M modules/profile/manifests/cache/text.pp
M modules/profile/manifests/webperf.pp
M modules/role/lib/puppet/parser/functions/kafka_cluster_name.rb
D modules/role/manifests/cache/kafka/statsv.pp
M modules/role/manifests/cache/text.pp
M modules/webperf/manifests/statsv.pp
M modules/webperf/templates/statsv.service.erb
9 files changed, 107 insertions(+), 84 deletions(-)

Approvals:
  Ottomata: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/hieradata/role/common/cache/text.yaml 
b/hieradata/role/common/cache/text.yaml
index 23ecc41..40e5c5d 100644
--- a/hieradata/role/common/cache/text.yaml
+++ b/hieradata/role/common/cache/text.yaml
@@ -99,3 +99,11 @@
 # Profile::cache::ssl::unified
 profile::cache::ssl::unified::monitoring: true
 profile::cache::ssl::unified::letsencrypt: false
+
+# This should match an entry in the kafka_clusters hash (defined in 
common.yaml).
+# We use the fully qualified kafka cluster name (with datacenter), because we 
want
+# to route all statsv -> statsd traffic to the datacenter that hosts the master
+# statsd instance.  If the active statsd instance changes to codfw (for an 
extended period of time)
+# should probably change this to main-codfw.  If you don't things will 
probably be fine,
+# but statsv will have to send messages over UDP cross-DC to the active statsd 
instance.
+profile::cache::kafka::statsv::kafka_cluster_name: main-eqiad
diff --git a/modules/profile/manifests/cache/kafka/statsv.pp 
b/modules/profile/manifests/cache/kafka/statsv.pp
new file mode 100644
index 0000000..ccacf42
--- /dev/null
+++ b/modules/profile/manifests/cache/kafka/statsv.pp
@@ -0,0 +1,62 @@
+# === Class profile::cache::kafka::statsv
+#
+# Sets up a varnishkafka logging endpoint for collecting
+# application level metrics. We are calling this system
+# statsv, as it is similar to statsd, but uses varnish
+# as its logging endpoint.
+#
+# === Parameters
+#
+# [*cache_cluster*]
+#   Used in when naming varnishkafka metrics.
+#   Default:  hiera('cache::cluster')
+#
+# [*kafka_cluster_name*]
+#   The name of the kafka cluster to use from the kafka_clusters hiera 
variable.
+#   Since only one statsd instance is active at any given time, you should 
probably
+#   set this explicitly to a fully qualified kafka cluster name (with DC 
suffix) that
+#   is located in the same DC as the active statsd instance.
+#
+class profile::cache::kafka::statsv(
+    $cache_cluster      = hiera('cache::cluster'),
+    $kafka_cluster_name = 
hiera('profile::cache::kafka::statsv::kafka_cluster_name')
+)
+{
+    $kafka_config  = kafka_config($kafka_cluster_name)
+    $kafka_brokers = $kafka_config['brokers']['array']
+
+    $format  = "%{fake_tag0@hostname?${::fqdn}}x %{%FT%T@dt}t 
%{X-Client-IP@ip}o %{@uri_path}U %{@uri_query}q %{User-Agent@user_agent}i"
+
+    varnishkafka::instance { 'statsv':
+        brokers                     => $kafka_brokers,
+        format                      => $format,
+        format_type                 => 'json',
+        topic                       => 'statsv',
+        varnish_name                => 'frontend',
+        varnish_svc_name            => 'varnish-frontend',
+        # Only log webrequests to /beacon/statsv
+        varnish_opts                => { 'q' => 'ReqURL ~ "^/beacon/statsv\?"' 
},
+        # -1 means all brokers in the ISR must ACK this request.
+        topic_request_required_acks => '-1',
+        force_protocol_version      => $kafka_config['api_version'],
+    }
+
+    # Make sure varnishes are configured and started for the first time
+    # before the instances as well, or they fail to start initially...
+    Service <| tag == 'varnish_instance' |> -> Varnishkafka::Instance['statsv']
+
+    # Generate icinga alert if varnishkafka is not running.
+    nrpe::monitor_service { 'varnishkafka-statsv':
+        description   => 'statsv Varnishkafka log producer',
+        nrpe_command  => "/usr/lib/nagios/plugins/check_procs -c 1 -a 
'/usr/bin/varnishkafka -S /etc/varnishkafka/statsv.conf'",
+        contact_group => 'admins,analytics',
+        require       => Class['::varnishkafka'],
+    }
+
+    # Sets up Logster to read from the Varnishkafka instance stats JSON file
+    # and report metrics to statsd.
+    varnishkafka::monitor::statsd { 'statsv':
+        graphite_metric_prefix => 
"varnishkafka.${::hostname}.statsv.${cache_cluster}",
+        statsd_host_port       => hiera('statsd'),
+    }
+}
diff --git a/modules/profile/manifests/cache/text.pp 
b/modules/profile/manifests/cache/text.pp
index 9df5d94..d4225b8 100644
--- a/modules/profile/manifests/cache/text.pp
+++ b/modules/profile/manifests/cache/text.pp
@@ -87,17 +87,12 @@
         backend_warming  => $backend_warming,
     }
 
-    # varnishkafka statsv listens for special stats related requests
-    # and sends them to the 'statsv' topic in Kafka.
-    # A kafka consumer then consumes these and emits
-    # metrics.
-    class { '::role::cache::kafka::statsv': }
-
     # varnishkafka eventlogging listens for eventlogging
     # requests and logs them to the eventlogging-client-side
     # topic.  EventLogging servers consume and process this
     # topic into many JSON based kafka topics for further
     # consumption.
+    # TODO: Move this to profile, include from role::cache::text.
     class { '::role::cache::kafka::eventlogging': }
 
     # ResourceLoader browser cache hit rate and request volume stats.
diff --git a/modules/profile/manifests/webperf.pp 
b/modules/profile/manifests/webperf.pp
index 801c9d3..c69d326 100644
--- a/modules/profile/manifests/webperf.pp
+++ b/modules/profile/manifests/webperf.pp
@@ -1,20 +1,29 @@
 class profile::webperf(
     $statsd = hiera('statsd')
 ){
-
     $statsd_parts = split($statsd, ':')
     $statsd_host = $statsd_parts[0]
     $statsd_port = $statsd_parts[1]
 
+    # statsv is on main kafka, not analytics or jumbo kafka.
+    # Note that at any given time, all statsv varnishkafka producers are
+    # configured to send to only one kafka cluster (usually main-eqiad).
+    # statsv in an inactive datacenter will not process any messages, as
+    # varnishkafka will not produce any messages to that DC's kafka cluster.
+    # This is configured by the value of the hiera param
+    # profile::cache::kafka::statsv::kafka_cluster_name when the statsv 
varnishkafka
+    # profile is included (as of this writing on text caches).
+    $kafka_main_config = kafka_config('main')
+    $kafka_main_brokers = $kafka_main_config['brokers']['string']
+    # Consume statsd metrics from Kafka and emit them to statsd.
+    class { '::webperf::statsv':
+        kafka_brokers => $kafka_main_brokers,
+        statsd        => $statsd,
+    }
+
     # Use brokers from this Kafka cluster to consume metrics.
     $kafka_config  = kafka_config('analytics')
     $kafka_brokers = $kafka_config['brokers']['string']
-
-    # Consume statsd metrics from Kafka and emit them to statsd.
-    class { '::webperf::statsv':
-        kafka_brokers => $kafka_brokers,
-        statsd        => $statsd,
-    }
 
     # Aggregate client-side latency measurements collected via the
     # NavigationTiming MediaWiki extension and send them to Graphite.
diff --git a/modules/role/lib/puppet/parser/functions/kafka_cluster_name.rb 
b/modules/role/lib/puppet/parser/functions/kafka_cluster_name.rb
index d37a819..82999b7 100644
--- a/modules/role/lib/puppet/parser/functions/kafka_cluster_name.rb
+++ b/modules/role/lib/puppet/parser/functions/kafka_cluster_name.rb
@@ -17,12 +17,17 @@
 # === Usage
 #
 #   $cluster_name = kafka_cluster_name($prefix)
+# or
+#   $cluster_name = kafka_cluster_name($prefix, 'esams')
 #
-# This will get you the full Kafka cluster name for the given prefix. If the
-# '::kafka_cluster_name' variable is set in Hiera, the prefix is ignored and
-# the value is returned.
+# This will get you the full Kafka cluster name for the given prefix in the 
current $::site.
+# The full kafka cluster name is either looked up in the kafka_datacenter_map 
in Hiera,
+# OR returned as $prefix-$site.
 #
-
+# If the '::kafka_cluster_name' variable is set in Hiera, the prefix is 
ignored and
+# the value is returned.  TODO: remove ::kafka_cluster_name support; this is 
no longer used
+# and can cause confusion.
+#
 module Puppet::Parser::Functions
   newfunction(:kafka_cluster_name, :type => :rvalue, :arity => -2) do |args|
     # If kafka_cluster_name is set in scope in hiera, then just return it.
@@ -41,6 +46,7 @@
     # For historical reasons, the name of this cluster is 'eqiad'.
     elsif prefix == 'analytics'
       'eqiad'
+    # Else expect that the caller wants the kafka cluster for prefix in the 
current datacenter.
     else
       "#{prefix}-#{site}"
     end
diff --git a/modules/role/manifests/cache/kafka/statsv.pp 
b/modules/role/manifests/cache/kafka/statsv.pp
deleted file mode 100644
index 6cb9cc5..0000000
--- a/modules/role/manifests/cache/kafka/statsv.pp
+++ /dev/null
@@ -1,66 +0,0 @@
-# === Define role::cache::kafka::statsv
-#
-# Sets up a varnishkafka logging endpoint for collecting
-# application level metrics. We are calling this system
-# statsv, as it is similar to statsd, but uses varnish
-# as its logging endpoint.
-#
-# === Parameters
-#
-# [*varnish_name*]
-#   The name of the varnish instance to read shared logs from.
-#   Default 'frontend'
-# [*varnish_svc_name*]
-#   The name of the init unit for the above.
-#   Default 'varnish-frontend'
-# [*kafka_protocol_version*]
-#   Kafka API version to use, needed for brokers < 0.10
-#   https://issues.apache.org/jira/browse/KAFKA-3547
-#
-class role::cache::kafka::statsv(
-    $varnish_name           = 'frontend',
-    $varnish_svc_name       = 'varnish-frontend',
-    $kafka_protocol_version = '0.9.0.1',
-) inherits role::cache::kafka
-{
-    $format  = "%{fake_tag0@hostname?${::fqdn}}x %{%FT%T@dt}t 
%{X-Client-IP@ip}o %{@uri_path}U %{@uri_query}q %{User-Agent@user_agent}i"
-
-    # Set varnish.arg.q or varnish.arg.m according to Varnish version
-    $varnish_opts = { 'q' => 'ReqURL ~ "^/beacon/statsv\?"' }
-
-    varnishkafka::instance { 'statsv':
-        # FIXME - top-scope var without namespace, will break in puppet 2.8
-        # lint:ignore:variable_scope
-        brokers                     => $kafka_brokers,
-        # lint:endignore
-        format                      => $format,
-        format_type                 => 'json',
-        topic                       => 'statsv',
-        varnish_name                => $varnish_name,
-        varnish_svc_name            => $varnish_svc_name,
-        varnish_opts                => $varnish_opts,
-        # -1 means all brokers in the ISR must ACK this request.
-        topic_request_required_acks => '-1',
-        force_protocol_version      => $kafka_protocol_version,
-    }
-
-    include ::standard
-
-    # Generate icinga alert if varnishkafka is not running.
-    nrpe::monitor_service { 'varnishkafka-statsv':
-        description   => 'statsv Varnishkafka log producer',
-        nrpe_command  => "/usr/lib/nagios/plugins/check_procs -c 1 -a 
'/usr/bin/varnishkafka -S /etc/varnishkafka/statsv.conf'",
-        contact_group => 'admins,analytics',
-        require       => Class['::varnishkafka'],
-    }
-
-    $cache_type = hiera('cache::cluster')
-    $graphite_metric_prefix = "varnishkafka.${::hostname}.statsv.${cache_type}"
-
-    # Sets up Logster to read from the Varnishkafka instance stats JSON file
-    # and report metrics to statsd.
-    varnishkafka::monitor::statsd { 'statsv':
-        graphite_metric_prefix => $graphite_metric_prefix,
-        statsd_host_port       => hiera('statsd'),
-    }
-}
diff --git a/modules/role/manifests/cache/text.pp 
b/modules/role/manifests/cache/text.pp
index 16a2d40..1cb4d10 100644
--- a/modules/role/manifests/cache/text.pp
+++ b/modules/role/manifests/cache/text.pp
@@ -8,4 +8,9 @@
     include ::profile::cache::base
     include ::profile::cache::ssl::unified
     include ::profile::cache::text
+
+    # varnishkafka statsv listens for special stats related requests
+    # and sends them to the 'statsv' topic in Kafka. A kafka consumer
+    # (called 'statsv') then consumes these and emits metrics.
+    include ::profile::cache::kafka::statsv
 }
diff --git a/modules/webperf/manifests/statsv.pp 
b/modules/webperf/manifests/statsv.pp
index 3f31d76..1e63175 100644
--- a/modules/webperf/manifests/statsv.pp
+++ b/modules/webperf/manifests/statsv.pp
@@ -5,11 +5,15 @@
 # [*kafka_brokers*]
 #   string of comma separated Kafka bootstrap brokers
 #
+# [*topics*]
+#   Comma separated list of topics from which statsv should consume. Default: 
statsv
+#
 # [*statsd*]
 #   host:port of statsd instance.  Default: localhost:8125
 #
 class webperf::statsv(
     $kafka_brokers,
+    $topics = 'statsv',
     $statsd = 'localhost:8125',
 ) {
     include ::webperf
diff --git a/modules/webperf/templates/statsv.service.erb 
b/modules/webperf/templates/statsv.service.erb
index ba97ae8..a4f98eb 100644
--- a/modules/webperf/templates/statsv.service.erb
+++ b/modules/webperf/templates/statsv.service.erb
@@ -4,7 +4,7 @@
 
 [Service]
 WorkingDirectory=/srv/deployment/statsv/statsv
-ExecStart=/usr/bin/python /srv/deployment/statsv/statsv/statsv.py --brokers 
<%= @kafka_brokers %> --statsd <%= @statsd %>
+ExecStart=/usr/bin/python /srv/deployment/statsv/statsv/statsv.py --brokers 
<%= @kafka_brokers %> --statsd <%= @statsd %> --topics <%= @topics %>
 User=nobody
 Restart=always
 WatchdogSec=120

-- 
To view, visit https://gerrit.wikimedia.org/r/391705
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I6c566c19fcdab004eec21384e6a5c136b3cf699c
Gerrit-PatchSet: 10
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <ao...@wikimedia.org>
Gerrit-Reviewer: BBlack <bbl...@wikimedia.org>
Gerrit-Reviewer: Ema <e...@wikimedia.org>
Gerrit-Reviewer: Giuseppe Lavagetto <glavage...@wikimedia.org>
Gerrit-Reviewer: Krinkle <krinklem...@gmail.com>
Gerrit-Reviewer: Ottomata <ao...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to