Ottomata has submitted this change and it was merged.

Change subject: Kafkatee puppet module
......................................................................


Kafkatee puppet module

Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e
---
A manifests/init.pp
A manifests/input.pp
A manifests/output.pp
A templates/input.kafka.conf.erb
A templates/input.pipe.conf.erb
A templates/kafkatee.conf.erb
A templates/output.conf.erb
7 files changed, 347 insertions(+), 0 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved



diff --git a/manifests/init.pp b/manifests/init.pp
new file mode 100644
index 0000000..c6f7cf8
--- /dev/null
+++ b/manifests/init.pp
@@ -0,0 +1,64 @@
+# == Class: kafkatee
+#
+# Installs and configures a kafkatee instance. This does not configure any
+# inputs or outputs for the kafkatee instance.  You should configure them
+# using the kafkatee::input and kafkatee::output defines.
+#
+# == Parameters:
+# $kafka_brokers             - Array of Kafka broker addresses.
+# $kafka_offset_store_path   - Path in which to store consumed Kafka offsets.
+#                              Default: /var/cache/kafkatee/offsets
+# $kafka_offset_reset        - Where to consume from if the offset from which 
to
+#                              consume is not on the broker, or if there is no
+#                              stored offset yet.  One of: smallest, largest, 
error.
+#                              Default: largest
+# $kafka_message_max_bytes   - Maximum message size.  Default: undef (4000000).
+# $pidfile                   - Location of kafkatee pidfile.
+#                              Default: /var/run/kafkatee/kafkatee.pid
+# $log_statistics_file       - Path in which to store kafkatee .json 
statistics.
+#                              Default: /var/cache/kafkatee/kafkatee.stats.json
+# $log_statistics_interval   - How often to write statistics to 
$log_statistics_file.
+#                              Default: 60
+# $output_encoding           - If this is string and inputs are json, then the 
JSON
+#                              input will be transformed according to 
$output_format
+#                              before they are sent to the configured outputs.
+#                              Default: string
+# $output_format             - Format string with which to transform JSON data 
into
+#                              string output.  See kafkatee.conf documentation
+#                              for more info.
+#                              Default: SEE PARAMETER
+# $output_queue_size         - Maximum queue size for each output, in number 
of messages.
+#                              Default: undef, (1000000)
+# $config_file               - Main kafkatee config file.
+#                              Default: /etc/kafkatee.conf
+# $config_directory          - kafkatee config include directory.
+#                              Default: /etc/kafkatee.d
+#
+class kafkatee(
+    $kafka_brokers,
+    $kafka_offset_store_path = '/var/cache/kafkatee/offsets',
+    $kafka_offset_reset      = 'largest',
+    $kafka_message_max_bytes = undef,
+    $pidfile                 = '/var/run/kafkatee/kafkatee.pid',
+    $log_statistics_file     = '/var/cache/kafkatee/kafkatee.stats.json',
+    $log_statistics_interval = 60,
+    $output_encoding         = 'string',
+    $output_format           = '%{hostname}    %{sequence}     %{dt}   
%{time_firstbyte}       %{ip}   %{cache_status}/%{http_status}  
%{response_size}        %{http_method}  
http://%{uri_host}%{uri_path}%{uri_query}       -       %{content_type} 
%{referer}      %{x_forwarded_for}      %{user_agent}   %{accept_language}      
%{x_analytics}',
+    $output_queue_size       = undef,
+)
+{
+    package { 'kafkatee':
+        ensure => 'installed',
+    }
+
+    file { '/etc/kafkatee.conf':
+        content => template('kafkatee/kafkatee.conf.erb'),
+        require  => Package['kafkatee'],
+    }
+
+    service { 'kafkatee':
+        ensure    => running,
+        provider  => 'upstart',
+        subscribe => File['/etc/kafkatee.conf'],
+    }
+}
diff --git a/manifests/input.pp b/manifests/input.pp
new file mode 100644
index 0000000..5184750
--- /dev/null
+++ b/manifests/input.pp
@@ -0,0 +1,37 @@
+# == Define kafkatee::input
+# Configures a kafkatee input.  This can be either from Kafka
+# or from a subprocess pipe.  If from Kafka, the brokers consumed from
+# are global for this kafkatee instance and configured using
+# the main kafkatee class.
+#
+# == Parameters
+# $type             - Type of kafkatee input.  Either 'pipe' or 'kafka'.
+#                     Default: kafka
+# $topic            - Kafka topic from which to consume.  Default: undef
+# $partitions       - Kafka topic partitions from which to consume.
+#                     This can be a list of partitions, or a range, e.g. 0-9.
+#                     Default undef.
+# $offset           - Offset type from which to consume in Kafka.
+#                     One of: beginning, end, stored, or a hardcoded offset 
integer.
+#                     Default: end
+# $options          - Hash of key => value options to pass to this input.
+#                     Default: {}
+# $command          - If $type is pipe, then this command will be launched and 
its
+#                     stdout will be used as input data.
+#
+define kafkatee::input(
+    $type           = 'kafka',
+    $topic          = undef,
+    $partitions     = undef,
+    $offset         = 'end',
+    $options        = {},
+    $command        = undef,
+    $ensure         = 'present',
+)
+{
+    file { "/etc/kafkatee.d/input.${title}.conf":
+        ensure  => $ensure,
+        content => template("kafkatee/input.${type}.conf.erb"),
+        notify  => Service['kafkatee'],
+    }
+}
diff --git a/manifests/output.pp b/manifests/output.pp
new file mode 100644
index 0000000..38c27cd
--- /dev/null
+++ b/manifests/output.pp
@@ -0,0 +1,25 @@
+# == Define kafkatee::output
+# Configures a kafkatee output.
+#
+# == Parameters
+# $destination      - Where this output will be sent.  If $type is
+#                     'file', then this should be a file path.  Otherwise
+#                     it should be a process that receives input from stdin.
+# $type             - Type of kafkatee output.  Either 'file' or 'pipe'.
+#                     Default: file
+# $sample           - The sample rate denominator (1/$sample).
+#                     e.g. 1 means 100%, 1/10 means 10%, etc.
+#
+define kafkatee::output(
+    $destination,
+    $type           = 'file',
+    $sample         = 1,
+    $ensure         = 'present',
+)
+{
+    file { "/etc/kafkatee.d/output.${title}.conf":
+        ensure  => $ensure,
+        content => template('kafkatee/output.conf.erb'),
+        notify  => Service['kafkatee'],
+    }
+}
diff --git a/templates/input.kafka.conf.erb b/templates/input.kafka.conf.erb
new file mode 100644
index 0000000..bc66a19
--- /dev/null
+++ b/templates/input.kafka.conf.erb
@@ -0,0 +1,4 @@
+# Note: This file is managed by Puppet.
+
+# <%= @title %> kafka input
+input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] kafka 
topic <%= @topic %> partition <%= @partitions %> from <%= @offset %>
diff --git a/templates/input.pipe.conf.erb b/templates/input.pipe.conf.erb
new file mode 100644
index 0000000..9fd41a8
--- /dev/null
+++ b/templates/input.pipe.conf.erb
@@ -0,0 +1,4 @@
+# Note: This file is managed by Puppet.
+
+# <%= @title %> piped input
+input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] pipe <%= 
@command %>
diff --git a/templates/kafkatee.conf.erb b/templates/kafkatee.conf.erb
new file mode 100644
index 0000000..616d4eb
--- /dev/null
+++ b/templates/kafkatee.conf.erb
@@ -0,0 +1,210 @@
+#######################################################################
+#                                                                     #
+#                    kafkatee configuration file                      #
+#                                                                     #
+#                                                                     #
+#######################################################################
+#                                                                     #
+# Syntax:                                                             #
+#  <property-name> = <value>                                          #
+#  input <type args..>                                                #
+#  output <type arg..>                                                #
+#                                                                     #
+# Boolean property values:                                            #
+#   >0, "true", "yes", "on", "" - interpreted as true                 #
+#  everything else              - interpreted as false                #
+#                                                                     #
+#                                                                     #
+# The configuration file consists of:                                 #
+#   - Configuration properties (key = value) to control various       #
+#     aspects of kafkatee.                                            #
+#   - Inputs                                                          #
+#   - Outputs                                                         #
+#                                                                     #
+#######################################################################
+
+
+#######################################################################
+#                                                                     #
+# Configuration properties                                            #
+#                                                                     #
+#######################################################################
+
+#######################################################################
+#                                                                     #
+# Kafka configuration                                                 #
+#                                                                     #
+# Kafka configuration properties are prefixed with "kafka."           #
+# and topic properties are prefixed with "kafka.topic.".              #
+#                                                                     #
+# For the full range of Kafka handle and topic configuration          #
+# properties, see:                                                    #
+#  http://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md #
+#                                                                     #
+# And the Apache Kafka configuration reference:                       #
+#  http://kafka.apache.org/08/configuration.html                      #
+#                                                                     #
+#######################################################################
+
+# Initial list of kafka brokers
+# Default: none
+kafka.metadata.broker.list = <%= Array(@brokers).sort.join(',') %>
+
+# Offset file directory.
+# Each topic + partition combination has its own offset file.
+# Default: current directory
+kafka.topic.offset.store.path = <%= @kafka_offset_store_path %>
+
+# If the request offset was not found on broker, or there is no
+# initial offset known (no stored offset), then reset the offset according
+# to this configuration.
+# Values: smallest (oldest/beginning), largest (newest/end), error (fail)
+# Default: largest
+kafka.topic.auto.offset.reset = <%= @kafka_offset_reset %>
+
+# Maximum message size.
+# Should be synchronized on all producers, brokers and consumers.
+# Default: 4000000
+<%= @kafka_message_max_bytes ? "kafka.message.max.bytes = 
#{@kafka_messages_max_bytes}" : '#kafka.message.max.bytes = 10000000' %>
+
+# Kafka debugging
+# Default: none
+#kafka.debug = msg,topic,broker
+
+
+#######################################################################
+#                                                                     #
+# Misc configuration                                                  #
+#                                                                     #
+#######################################################################
+
+# Pid file location
+# Default: /var/run/kafkatee.pid
+pid.file.path = <%= @pidfile %>
+
+# Daemonize (background)
+# Default: true
+daemonize = false
+
+# Logging output level
+# 1 = only emergencies .. 6 = info, 7 = debug
+# Default: 6 (info)
+#log.level = 7
+
+
+#
+# JSON Statistics
+#
+# Statistics is collected from kafkatee itself(*) as well as librdkafka
+# Each JSON object has a top level key of either 'kafkatee' or
+# 'kafka' to indicate which type of statistics the object contains.
+# Each line is a valid JSON object.
+#
+# *: kafkatee does not currently output any stats of its own, just from 
rdkafka.
+#
+
+# Statistics output interval
+# Defaults to 60 seconds, use 0 to disable.
+log.statistics.interval = <%= @log_statistics_interval %>
+
+# Statistics output file
+# Defaults to /tmp/kafkatee.stats.json
+log.statistics.file = <%= @log_statistics_file %>
+
+
+# Command to run on startup, before starting IO.
+# Default: none
+#command.init = ./my-script.sh
+
+# Command to run on termination after all IOs have been stopped.
+# Default: none
+#command.term = ./my-cleanup-script.sh
+
+# Set environment variable which will be available for all sub-sequent
+# command executions (command.*, input pipe, output pipe, ..)
+#setenv.NMSGS=12
+# clear:
+#setenv.LD_LIBRARY_PATH=
+
+#######################################################################
+#                                                                     #
+# Message transformation                                              #
+#                                                                     #
+# A message read from one of the inputs may be transformed before     #
+# being enqueued on the output queues.                                #
+#                                                                     #
+# Transformation requires that the input and output encoding differs, #
+# i.e., 'input [encoding=json] ..' and 'output.encoding=string'       #
+#                                                                     #
+# While the input encoding is configured per input, the output        #
+# encoding is configured globally, all outputs will receive the same  #
+# message.                                                            #
+#                                                                     #
+# The currently supported transformation(s) are:                      #
+#  JSON input -> string output:                                       #
+#    JSON data is formatted according to the output.format            #
+#    configuration. The %{field} syntax references the field in the   #
+#    original JSON object by the same name and outputs its value.     #
+#                                                                     #
+# If the input and output encoding matches then the message remains   #
+# untouched.                                                          #
+#                                                                     #
+# The output message delimiter (defaults to newline (\n)) is          #
+# configurable (output.delimiter) and always appended to all output   #
+# messages regardless of transformation.                              #
+# The input is always stripped of its delimiter (which is newline     #
+# for pipe inputs).                                                   #
+#                                                                     #
+#######################################################################
+
+# Output encoding: string or json
+# Default: string
+output.encoding = <%= @output_encoding %>
+
+#######################################################################
+# Output formatting                                                   #
+#                                                                     #
+# The format string is made up of %{..}-tokens and literals.          #
+#                                                                     #
+# Tokens:                                                             #
+#                                                                     #
+#  %{FIELD}                                                           #
+#    Retrieves the value from the JSON object's field with the        #
+#    same name.                                                       #
+#                                                                     #
+#  %{FIELD?DEFAULT}                                                   #
+#    'DEFAULT' is the default string to use if no field was matched,  #
+#     the default default string is "-".                              #
+#                                                                     #
+#  Literals are copied verbatim to the output string.                 #
+#                                                                     #
+#  Example JSON: {"task":19, "name":"Mike"}                           #
+#  Example format: Got task %{task} for user %{name?unknown}          #
+#  Example output: Got task 19 for user Mike                          #
+#                                                                     #
+# Note: Multi-level JSON objects are flattened:                       #
+#       JSON:  {"a": {"b": 9}, "c": "hi"}                             #
+#       Gives: { "b": 9, "c": "hi" }                                  #
+#                                                                     #
+#######################################################################
+
+# Output format for JSON -> string transformation.
+# Default: none
+output.format = <%= @output_format %>
+
+
+# Output delimiter
+# The output message ends with this delimiter.
+# Supports \n, \r, \t, \0.
+# Default: newline (\n)
+#output.delimiter = ;END;\n
+
+
+# Maximum queue size for each output, in number of messages
+# Default: 100000
+#output.queue.size = 1000000
+<%= @output_queue_size ? "output.queue.size = #{@output_queue_size}" : 
'#output.queue.size = 1000000' %>
+
+
+# Include other config files in /etc/kafkatee.d/*.conf
+include /etc/kafkatee.d/*.conf
diff --git a/templates/output.conf.erb b/templates/output.conf.erb
new file mode 100644
index 0000000..54fc29e
--- /dev/null
+++ b/templates/output.conf.erb
@@ -0,0 +1,3 @@
+# Note: This file is managed by Puppet.
+
+output <%= @type %> <%= @sample %> <%= @destination %>

-- 
To view, visit https://gerrit.wikimedia.org/r/110650
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e
Gerrit-PatchSet: 3
Gerrit-Project: operations/puppet/kafkatee
Gerrit-Branch: master
Gerrit-Owner: Ottomata <o...@wikimedia.org>
Gerrit-Reviewer: Alexandros Kosiaris <akosia...@wikimedia.org>
Gerrit-Reviewer: Faidon Liambotis <fai...@wikimedia.org>
Gerrit-Reviewer: Matanya <mata...@foss.co.il>
Gerrit-Reviewer: Ottomata <o...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to