Ottomata has submitted this change and it was merged. Change subject: Kafkatee puppet module ......................................................................
Kafkatee puppet module Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e --- A manifests/init.pp A manifests/input.pp A manifests/output.pp A templates/input.kafka.conf.erb A templates/input.pipe.conf.erb A templates/kafkatee.conf.erb A templates/output.conf.erb 7 files changed, 347 insertions(+), 0 deletions(-) Approvals: Ottomata: Verified; Looks good to me, approved diff --git a/manifests/init.pp b/manifests/init.pp new file mode 100644 index 0000000..c6f7cf8 --- /dev/null +++ b/manifests/init.pp @@ -0,0 +1,64 @@ +# == Class: kafkatee +# +# Installs and configures a kafkatee instance. This does not configure any +# inputs or outputs for the kafkatee instance. You should configure them +# using the kafkatee::input and kafkatee::output defines. +# +# == Parameters: +# $kafka_brokers - Array of Kafka broker addresses. +# $kafka_offset_store_path - Path in which to store consumed Kafka offsets. +# Default: /var/cache/kafkatee/offsets +# $kafka_offset_reset - Where to consume from if the offset from which to +# consume is not on the broker, or if there is no +# stored offset yet. One of: smallest, largest, error. +# Default: largest +# $kafka_message_max_bytes - Maximum message size. Default: undef (4000000). +# $pidfile - Location of kafkatee pidfile. +# Default: /var/run/kafkatee/kafkatee.pid +# $log_statistics_file - Path in which to store kafkatee .json statistics. +# Default: /var/cache/kafkatee/kafkatee.stats.json +# $log_statistics_interval - How often to write statistics to $log_statistics_file. +# Default: 60 +# $output_encoding - If this is string and inputs are json, then the JSON +# input will be transformed according to $output_format +# before they are sent to the configured outputs. +# Default: string +# $output_format - Format string with which to transform JSON data into +# string output. See kafkatee.conf documentation +# for more info. +# Default: SEE PARAMETER +# $output_queue_size - Maximum queue size for each output, in number of messages. +# Default: undef, (1000000) +# $config_file - Main kafkatee config file. +# Default: /etc/kafkatee.conf +# $config_directory - kafkatee config include directory. +# Default: /etc/kafkatee.d +# +class kafkatee( + $kafka_brokers, + $kafka_offset_store_path = '/var/cache/kafkatee/offsets', + $kafka_offset_reset = 'largest', + $kafka_message_max_bytes = undef, + $pidfile = '/var/run/kafkatee/kafkatee.pid', + $log_statistics_file = '/var/cache/kafkatee/kafkatee.stats.json', + $log_statistics_interval = 60, + $output_encoding = 'string', + $output_format = '%{hostname} %{sequence} %{dt} %{time_firstbyte} %{ip} %{cache_status}/%{http_status} %{response_size} %{http_method} http://%{uri_host}%{uri_path}%{uri_query} - %{content_type} %{referer} %{x_forwarded_for} %{user_agent} %{accept_language} %{x_analytics}', + $output_queue_size = undef, +) +{ + package { 'kafkatee': + ensure => 'installed', + } + + file { '/etc/kafkatee.conf': + content => template('kafkatee/kafkatee.conf.erb'), + require => Package['kafkatee'], + } + + service { 'kafkatee': + ensure => running, + provider => 'upstart', + subscribe => File['/etc/kafkatee.conf'], + } +} diff --git a/manifests/input.pp b/manifests/input.pp new file mode 100644 index 0000000..5184750 --- /dev/null +++ b/manifests/input.pp @@ -0,0 +1,37 @@ +# == Define kafkatee::input +# Configures a kafkatee input. This can be either from Kafka +# or from a subprocess pipe. If from Kafka, the brokers consumed from +# are global for this kafkatee instance and configured using +# the main kafkatee class. +# +# == Parameters +# $type - Type of kafkatee input. Either 'pipe' or 'kafka'. +# Default: kafka +# $topic - Kafka topic from which to consume. Default: undef +# $partitions - Kafka topic partitions from which to consume. +# This can be a list of partitions, or a range, e.g. 0-9. +# Default undef. +# $offset - Offset type from which to consume in Kafka. +# One of: beginning, end, stored, or a hardcoded offset integer. +# Default: end +# $options - Hash of key => value options to pass to this input. +# Default: {} +# $command - If $type is pipe, then this command will be launched and its +# stdout will be used as input data. +# +define kafkatee::input( + $type = 'kafka', + $topic = undef, + $partitions = undef, + $offset = 'end', + $options = {}, + $command = undef, + $ensure = 'present', +) +{ + file { "/etc/kafkatee.d/input.${title}.conf": + ensure => $ensure, + content => template("kafkatee/input.${type}.conf.erb"), + notify => Service['kafkatee'], + } +} diff --git a/manifests/output.pp b/manifests/output.pp new file mode 100644 index 0000000..38c27cd --- /dev/null +++ b/manifests/output.pp @@ -0,0 +1,25 @@ +# == Define kafkatee::output +# Configures a kafkatee output. +# +# == Parameters +# $destination - Where this output will be sent. If $type is +# 'file', then this should be a file path. Otherwise +# it should be a process that receives input from stdin. +# $type - Type of kafkatee output. Either 'file' or 'pipe'. +# Default: file +# $sample - The sample rate denominator (1/$sample). +# e.g. 1 means 100%, 1/10 means 10%, etc. +# +define kafkatee::output( + $destination, + $type = 'file', + $sample = 1, + $ensure = 'present', +) +{ + file { "/etc/kafkatee.d/output.${title}.conf": + ensure => $ensure, + content => template('kafkatee/output.conf.erb'), + notify => Service['kafkatee'], + } +} diff --git a/templates/input.kafka.conf.erb b/templates/input.kafka.conf.erb new file mode 100644 index 0000000..bc66a19 --- /dev/null +++ b/templates/input.kafka.conf.erb @@ -0,0 +1,4 @@ +# Note: This file is managed by Puppet. + +# <%= @title %> kafka input +input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] kafka topic <%= @topic %> partition <%= @partitions %> from <%= @offset %> diff --git a/templates/input.pipe.conf.erb b/templates/input.pipe.conf.erb new file mode 100644 index 0000000..9fd41a8 --- /dev/null +++ b/templates/input.pipe.conf.erb @@ -0,0 +1,4 @@ +# Note: This file is managed by Puppet. + +# <%= @title %> piped input +input [<%= @options.map { |key,val| "#{key}=#{val}" }.join(',') %>] pipe <%= @command %> diff --git a/templates/kafkatee.conf.erb b/templates/kafkatee.conf.erb new file mode 100644 index 0000000..616d4eb --- /dev/null +++ b/templates/kafkatee.conf.erb @@ -0,0 +1,210 @@ +####################################################################### +# # +# kafkatee configuration file # +# # +# # +####################################################################### +# # +# Syntax: # +# <property-name> = <value> # +# input <type args..> # +# output <type arg..> # +# # +# Boolean property values: # +# >0, "true", "yes", "on", "" - interpreted as true # +# everything else - interpreted as false # +# # +# # +# The configuration file consists of: # +# - Configuration properties (key = value) to control various # +# aspects of kafkatee. # +# - Inputs # +# - Outputs # +# # +####################################################################### + + +####################################################################### +# # +# Configuration properties # +# # +####################################################################### + +####################################################################### +# # +# Kafka configuration # +# # +# Kafka configuration properties are prefixed with "kafka." # +# and topic properties are prefixed with "kafka.topic.". # +# # +# For the full range of Kafka handle and topic configuration # +# properties, see: # +# http://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md # +# # +# And the Apache Kafka configuration reference: # +# http://kafka.apache.org/08/configuration.html # +# # +####################################################################### + +# Initial list of kafka brokers +# Default: none +kafka.metadata.broker.list = <%= Array(@brokers).sort.join(',') %> + +# Offset file directory. +# Each topic + partition combination has its own offset file. +# Default: current directory +kafka.topic.offset.store.path = <%= @kafka_offset_store_path %> + +# If the request offset was not found on broker, or there is no +# initial offset known (no stored offset), then reset the offset according +# to this configuration. +# Values: smallest (oldest/beginning), largest (newest/end), error (fail) +# Default: largest +kafka.topic.auto.offset.reset = <%= @kafka_offset_reset %> + +# Maximum message size. +# Should be synchronized on all producers, brokers and consumers. +# Default: 4000000 +<%= @kafka_message_max_bytes ? "kafka.message.max.bytes = #{@kafka_messages_max_bytes}" : '#kafka.message.max.bytes = 10000000' %> + +# Kafka debugging +# Default: none +#kafka.debug = msg,topic,broker + + +####################################################################### +# # +# Misc configuration # +# # +####################################################################### + +# Pid file location +# Default: /var/run/kafkatee.pid +pid.file.path = <%= @pidfile %> + +# Daemonize (background) +# Default: true +daemonize = false + +# Logging output level +# 1 = only emergencies .. 6 = info, 7 = debug +# Default: 6 (info) +#log.level = 7 + + +# +# JSON Statistics +# +# Statistics is collected from kafkatee itself(*) as well as librdkafka +# Each JSON object has a top level key of either 'kafkatee' or +# 'kafka' to indicate which type of statistics the object contains. +# Each line is a valid JSON object. +# +# *: kafkatee does not currently output any stats of its own, just from rdkafka. +# + +# Statistics output interval +# Defaults to 60 seconds, use 0 to disable. +log.statistics.interval = <%= @log_statistics_interval %> + +# Statistics output file +# Defaults to /tmp/kafkatee.stats.json +log.statistics.file = <%= @log_statistics_file %> + + +# Command to run on startup, before starting IO. +# Default: none +#command.init = ./my-script.sh + +# Command to run on termination after all IOs have been stopped. +# Default: none +#command.term = ./my-cleanup-script.sh + +# Set environment variable which will be available for all sub-sequent +# command executions (command.*, input pipe, output pipe, ..) +#setenv.NMSGS=12 +# clear: +#setenv.LD_LIBRARY_PATH= + +####################################################################### +# # +# Message transformation # +# # +# A message read from one of the inputs may be transformed before # +# being enqueued on the output queues. # +# # +# Transformation requires that the input and output encoding differs, # +# i.e., 'input [encoding=json] ..' and 'output.encoding=string' # +# # +# While the input encoding is configured per input, the output # +# encoding is configured globally, all outputs will receive the same # +# message. # +# # +# The currently supported transformation(s) are: # +# JSON input -> string output: # +# JSON data is formatted according to the output.format # +# configuration. The %{field} syntax references the field in the # +# original JSON object by the same name and outputs its value. # +# # +# If the input and output encoding matches then the message remains # +# untouched. # +# # +# The output message delimiter (defaults to newline (\n)) is # +# configurable (output.delimiter) and always appended to all output # +# messages regardless of transformation. # +# The input is always stripped of its delimiter (which is newline # +# for pipe inputs). # +# # +####################################################################### + +# Output encoding: string or json +# Default: string +output.encoding = <%= @output_encoding %> + +####################################################################### +# Output formatting # +# # +# The format string is made up of %{..}-tokens and literals. # +# # +# Tokens: # +# # +# %{FIELD} # +# Retrieves the value from the JSON object's field with the # +# same name. # +# # +# %{FIELD?DEFAULT} # +# 'DEFAULT' is the default string to use if no field was matched, # +# the default default string is "-". # +# # +# Literals are copied verbatim to the output string. # +# # +# Example JSON: {"task":19, "name":"Mike"} # +# Example format: Got task %{task} for user %{name?unknown} # +# Example output: Got task 19 for user Mike # +# # +# Note: Multi-level JSON objects are flattened: # +# JSON: {"a": {"b": 9}, "c": "hi"} # +# Gives: { "b": 9, "c": "hi" } # +# # +####################################################################### + +# Output format for JSON -> string transformation. +# Default: none +output.format = <%= @output_format %> + + +# Output delimiter +# The output message ends with this delimiter. +# Supports \n, \r, \t, \0. +# Default: newline (\n) +#output.delimiter = ;END;\n + + +# Maximum queue size for each output, in number of messages +# Default: 100000 +#output.queue.size = 1000000 +<%= @output_queue_size ? "output.queue.size = #{@output_queue_size}" : '#output.queue.size = 1000000' %> + + +# Include other config files in /etc/kafkatee.d/*.conf +include /etc/kafkatee.d/*.conf diff --git a/templates/output.conf.erb b/templates/output.conf.erb new file mode 100644 index 0000000..54fc29e --- /dev/null +++ b/templates/output.conf.erb @@ -0,0 +1,3 @@ +# Note: This file is managed by Puppet. + +output <%= @type %> <%= @sample %> <%= @destination %> -- To view, visit https://gerrit.wikimedia.org/r/110650 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie91622168233d88c4eab6a80c3437d8622ba506e Gerrit-PatchSet: 3 Gerrit-Project: operations/puppet/kafkatee Gerrit-Branch: master Gerrit-Owner: Ottomata <o...@wikimedia.org> Gerrit-Reviewer: Alexandros Kosiaris <akosia...@wikimedia.org> Gerrit-Reviewer: Faidon Liambotis <fai...@wikimedia.org> Gerrit-Reviewer: Matanya <mata...@foss.co.il> Gerrit-Reviewer: Ottomata <o...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits