Ottomata has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/359212 )

Change subject: Insert select eventbus generated event topics into eventlogging 
MySQL database
......................................................................

Insert select eventbus generated event topics into eventlogging MySQL database

Bug: T150369

Change-Id: I63bbc5d5fe2f17b4b8e0cba5514e27bd08c35498
---
M modules/eventlogging/manifests/dependencies.pp
M modules/eventlogging/manifests/service/consumer.pp
M modules/eventlogging/templates/consumer.erb
M modules/role/manifests/eventlogging/analytics/mysql.pp
4 files changed, 55 insertions(+), 12 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/12/359212/1

diff --git a/modules/eventlogging/manifests/dependencies.pp 
b/modules/eventlogging/manifests/dependencies.pp
index 5c24d56..7f638eb 100644
--- a/modules/eventlogging/manifests/dependencies.pp
+++ b/modules/eventlogging/manifests/dependencies.pp
@@ -19,6 +19,8 @@
         'python-jsonschema',
         'python-confluent-kafka',
         'python-kafka',
+        # Python snappy allows python-kafka to consume Snappy compressed data.
+        'python-snappy',
         'python-mysqldb',
         'python-pygments',
         'python-pykafka',
diff --git a/modules/eventlogging/manifests/service/consumer.pp 
b/modules/eventlogging/manifests/service/consumer.pp
index b1d2072..0cacb15 100644
--- a/modules/eventlogging/manifests/service/consumer.pp
+++ b/modules/eventlogging/manifests/service/consumer.pp
@@ -24,6 +24,12 @@
 #   subscribing to the input stream. Defaults to the resource title.
 #   Should contain only URL-safe characters.
 #
+# [*schemas_path*]
+#   If given, this path will be passed to eventlogging-consumer --schemas-path,
+#   which causes schemas to be loaded and cached from a local file path before
+#   consumption begins.  This does not restrict the consumer from finding
+#   schemas on meta.wikimedia.org if they don't exist in schemas_path.
+#
 # [*ensure*]
 #   Specifies whether the consumer should be provisioned or destroyed.
 #   Value may be 'present' (provisions the resource; the default) or
@@ -48,11 +54,12 @@
 define eventlogging::service::consumer(
     $input,
     $output,
-    $sid    = $title,
-    $ensure = present,
-    $owner  = 'root',
-    $group  = 'root',
-    $mode   = '0644',
+    $sid          = $title,
+    $schemas_path = undef,
+    $ensure       = present,
+    $owner        = 'root',
+    $group        = 'root',
+    $mode         = '0644',
 ) {
     Class['eventlogging::server'] -> Eventlogging::Service::Consumer[$title]
 
diff --git a/modules/eventlogging/templates/consumer.erb 
b/modules/eventlogging/templates/consumer.erb
index 793787f..66d3b72 100644
--- a/modules/eventlogging/templates/consumer.erb
+++ b/modules/eventlogging/templates/consumer.erb
@@ -1,3 +1,7 @@
+<% if @schemas_path 0%>
+--schemas-path
+<%= @schemas_path %>
+<% end -%
 <% if @sid -%>
 <%= @input %><%= @input.include?('?') ? '&' : '?' %>identity=<%= @sid %>
 <% else -%>
diff --git a/modules/role/manifests/eventlogging/analytics/mysql.pp 
b/modules/role/manifests/eventlogging/analytics/mysql.pp
index c0d2a62..9c7b0e5 100644
--- a/modules/role/manifests/eventlogging/analytics/mysql.pp
+++ b/modules/role/manifests/eventlogging/analytics/mysql.pp
@@ -6,6 +6,13 @@
 class role::eventlogging::analytics::mysql {
     include role::eventlogging::analytics::server
 
+    # We use the mediawiki/event-schemas to support insertion of events from 
EventBus
+    # that use those schemas not on meta.wikimedia.org.
+    # NOTE: If an event schema changes, the eventlogging-consumer process(es) 
will
+    # not be automatically restarted.  You must manually restart this for the 
consumer
+    # process to pick up changes to local schemas.
+    require eventschemas
+
     ## MySQL / MariaDB
 
     # Log strictly valid events to the 'log' database on m4-master.
@@ -41,7 +48,26 @@
     }
 
     # mixed_uri URI is defined for DRY purposes in 
role::eventlogging::analytics::server.
-    $kafka_mixed_uri = $role::eventlogging::analytics::server::kafka_mixed_uri
+    $kafka_consumer_scheme = 
$role::eventlogging::analytics::server::kafka_consumer_scheme
+
+    # Add more here as requested.
+    # NOTE: The datacenter prefixed topics are produced via EventBus, and are 
of
+    # schemas in the mediawiki/event-schemas repository.
+    $topics_to_consume = [
+        # Valid eventlogging analytics events are all in this one topic.
+        'eventlogging-valid-mixed',
+        # Various mediawiki events (via EventBus)
+        'eqiad.mediawiki.revision-create',
+        'codfw.mediawiki.revision-create',
+        'eqiad.mediawiki.page-move',
+        'codfw.mediawiki.page-move',
+        'eqiad.mediawiki.page-delete',
+        'codfw.mediawiki.page-delete',
+        'eqiad.mediawiki.page-undelete',
+        'codfw.mediawiki.page-undelete',
+    ]
+    $topics_string = join($topics_to_consume, ',')
+    $kafka_consumer_uri = 
"${kafka_consumer_scheme}/${kafka_brokers_string}?topics=${topics_string}"
 
     # Define statsd host url to send mysql insert metrics.
     # For beta cluster, set in 
https://wikitech.wikimedia.org/wiki/Hiera:Deployment-prep
@@ -56,12 +82,16 @@
     # Kafka consumer group for this consumer is mysql-m4-master
     eventlogging::service::consumer { $mysql_consumers:
         # auto commit offsets to kafka more often for mysql consumer
-        input  => 
"${filter_scheme}${kafka_mixed_uri}&auto_commit_interval_ms=1000${$kafka_api_version_param}${filter_function}",
-        output => 
"mysql://${mysql_user}:${mysql_pass}@${mysql_db}?charset=utf8&statsd_host=${statsd_host}&replace=True",
-        sid    => $kafka_consumer_group,
+        input        => 
"${filter_scheme}${kafka_consumer_uri}&auto_commit_interval_ms=1000${$kafka_api_version_param}${filter_function}",
+        output       => 
"mysql://${mysql_user}:${mysql_pass}@${mysql_db}?charset=utf8&statsd_host=${statsd_host}&replace=True",
+        # Load and cache local (EventBus) schemas so those events can be 
inserted into MySQL too.
+        # This will require a restart of the consumer process(es) when there 
are any new schemas.
+        schemas_path => "${::eventschemas::path}/jsonschema",
+        sid          => $kafka_consumer_group,
         # Restrict permissions on this config file since it contains a 
password.
-        owner  => 'root',
-        group  => 'eventlogging',
-        mode   => '0640',
+        owner        => 'root',
+        group        => 'eventlogging',
+        mode         => '0640',
+
     }
 }

-- 
To view, visit https://gerrit.wikimedia.org/r/359212
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I63bbc5d5fe2f17b4b8e0cba5514e27bd08c35498
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Ottomata <ao...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to