Dave Rawks has proposed merging lp:~drawks/graphite/graphite-msgpack into 
lp:graphite.

Requested reviews:
  graphite-dev (graphite-dev)

For more details, see:
https://code.launchpad.net/~drawks/graphite/graphite-msgpack/+merge/92921

Added msgpack as a receiver protocol. msgpack is super fast at deserializing 
and much safer than pickle since it doesn't support serializing things like 
modules, functions, or class objects. Also added support to enable/disable 
individual listeners in carbon.conf

New code path can be tested with a simple metric writer like:

import msgpack, socket, time:
s=socket.create_connection(('localhost',2005))
fs=s.makefile()
for second in xrange(int(time.time()) - 86400,int(time.time())):
    msgpack.pack(("foo.bar",(second,1)),fs)
fs.flush()
fs.close()
s.shutdown(socket.SHUT_RDWR)


-- 
https://code.launchpad.net/~drawks/graphite/graphite-msgpack/+merge/92921
Your team graphite-dev is requested to review the proposed merge of 
lp:~drawks/graphite/graphite-msgpack into lp:graphite.
=== modified file 'carbon/conf/carbon.conf.example'
--- carbon/conf/carbon.conf.example	2011-12-14 16:22:17 +0000
+++ carbon/conf/carbon.conf.example	2012-02-14 07:19:18 +0000
@@ -56,6 +56,7 @@
 # the files quickly but at the risk of slowing I/O down considerably for a while.
 MAX_CREATES_PER_MINUTE = 50
 
+ENABLE_LINE_RECEIVER = True
 LINE_RECEIVER_INTERFACE = 0.0.0.0
 LINE_RECEIVER_PORT = 2003
 
@@ -66,6 +67,7 @@
 UDP_RECEIVER_INTERFACE = 0.0.0.0
 UDP_RECEIVER_PORT = 2003
 
+ENABLE_PICKLE_RECEIVER = True
 PICKLE_RECEIVER_INTERFACE = 0.0.0.0
 PICKLE_RECEIVER_PORT = 2004
 
@@ -74,6 +76,10 @@
 # Set this to True to revert to the old-fashioned insecure unpickler.
 USE_INSECURE_UNPICKLER = False
 
+ENABLE_MSGPACK_RECEIVER = True
+MSGPACK_RECEIVER_INTERFACE = 0.0.0.0
+MSGPACK_RECEIVER_PORT = 2005
+
 CACHE_QUERY_INTERFACE = 0.0.0.0
 CACHE_QUERY_PORT = 7002
 

=== modified file 'carbon/lib/carbon/conf.py'
--- carbon/lib/carbon/conf.py	2012-02-09 21:38:55 +0000
+++ carbon/lib/carbon/conf.py	2012-02-14 07:19:18 +0000
@@ -32,11 +32,16 @@
   MAX_CACHE_SIZE=float('inf'),
   MAX_UPDATES_PER_SECOND=500,
   MAX_CREATES_PER_MINUTE=float('inf'),
+  ENABLE_LINE_RECEIVER=True,
   LINE_RECEIVER_INTERFACE='0.0.0.0',
   LINE_RECEIVER_PORT=2003,
   ENABLE_UDP_LISTENER=False,
   UDP_RECEIVER_INTERFACE='0.0.0.0',
   UDP_RECEIVER_PORT=2003,
+  ENABLE_MSGPACK_RECEIVER=True,
+  MSGPACK_RECEIVER_INTERFACE='0.0.0.0',
+  MSGPACK_RECEIVER_PORT=2005,
+  ENABLE_PICKLE_RECEIVER=True,
   PICKLE_RECEIVER_INTERFACE='0.0.0.0',
   PICKLE_RECEIVER_PORT=2004,
   CACHE_QUERY_INTERFACE='0.0.0.0',

=== modified file 'carbon/lib/carbon/protocols.py'
--- carbon/lib/carbon/protocols.py	2012-02-11 06:26:28 +0000
+++ carbon/lib/carbon/protocols.py	2012-02-14 07:19:18 +0000
@@ -1,11 +1,13 @@
 from twisted.internet import reactor
-from twisted.internet.protocol import DatagramProtocol
+from twisted.internet.protocol import DatagramProtocol, Protocol
 from twisted.internet.error import ConnectionDone
 from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver
 from carbon import log, events, state, management
 from carbon.conf import settings
 from carbon.regexlist import WhiteList, BlackList
 from carbon.util import pickle, get_unpickler
+if settings.ENABLE_MSGPACK_RECEIVER:
+  from msgpack import Unpacker
 
 
 class MetricReceiver:
@@ -106,6 +108,33 @@
       self.metricReceived(metric, datapoint)
 
 
+class MetricMessagePackReceiver(MetricReceiver, Protocol):
+
+  def connectionMade(self):
+    MetricReceiver.connectionMade(self)
+    self.unpacker = Unpacker()
+
+  def dataReceived(self, data):
+    if len(data) <= 0:
+      log.listener('msgpack short read from %s' % self.peerName)
+      return
+    try:
+      self.unpacker.feed(data)
+      for (metric, datapoint) in self.unpacker:
+        if not isinstance(metric,str):
+          log.listener('invalid metric name/type %r/%r received from %s' % ( metric, type(metric), self.peerName))
+          continue
+        try:
+          datapoint = ( float(datapoint[0]), float(datapoint[1]) )
+        except:
+          continue
+        self.metricReceived(metric, datapoint)
+    except:
+      log.listener('invalid msgpack received from %s, ignoring' % self.peerName)
+      return
+
+
+
 class CacheManagementHandler(Int32StringReceiver):
   def connectionMade(self):
     peer = self.transport.getPeer()

=== modified file 'carbon/lib/carbon/service.py'
--- carbon/lib/carbon/service.py	2011-12-14 16:22:17 +0000
+++ carbon/lib/carbon/service.py	2012-02-14 07:19:18 +0000
@@ -40,7 +40,7 @@
 def createBaseService(config):
     from carbon.conf import settings
     from carbon.protocols import (MetricLineReceiver, MetricPickleReceiver,
-                                  MetricDatagramReceiver)
+                                  MetricDatagramReceiver, MetricMessagePackReceiver)
 
     root_service = CarbonRootService()
     root_service.setName(settings.program)
@@ -59,12 +59,20 @@
         amqp_exchange_name = settings.get("AMQP_EXCHANGE", "graphite")
 
 
-    for interface, port, protocol in ((settings.LINE_RECEIVER_INTERFACE,
-                                       settings.LINE_RECEIVER_PORT,
-                                       MetricLineReceiver),
-                                      (settings.PICKLE_RECEIVER_INTERFACE,
-                                       settings.PICKLE_RECEIVER_PORT,
-                                       MetricPickleReceiver)):
+    receivers = []
+    if settings.ENABLE_LINE_RECEIVER:
+       receivers.append((settings.LINE_RECEIVER_INTERFACE,
+                         settings.LINE_RECEIVER_PORT,
+                         MetricLineReceiver))
+    if settings.ENABLE_PICKLE_RECEIVER:
+       receivers.append((settings.PICKLE_RECEIVER_INTERFACE,
+                         settings.PICKLE_RECEIVER_PORT,
+                         MetricPickleReceiver))
+    if settings.ENABLE_MSGPACK_RECEIVER:
+       receivers.append((settings.MSGPACK_RECEIVER_INTERFACE,
+                         settings.MSGPACK_RECEIVER_PORT,
+                         MetricMessagePackReceiver))
+    for (interface, port, protocol) in receivers:
         if port:
             factory = ServerFactory()
             factory.protocol = protocol

=== modified file 'check-dependencies.py'
--- check-dependencies.py	2012-02-10 05:14:15 +0000
+++ check-dependencies.py	2012-02-14 07:19:18 +0000
@@ -168,6 +168,14 @@
   print "Note that txamqp requires python 2.5 or greater."
   warning += 1
 
+# Test for msgpack
+try:
+  import msgpack
+except:
+  print "[WARNING]"
+  print "Unable to import the 'msgpack' module, this is required if you want to use msgpack receiver."
+  warning += 1
+
 
 if fatal:
   print "%d necessary dependencies not met. Graphite will not function until these dependencies are fulfilled." % fatal

_______________________________________________
Mailing list: https://launchpad.net/~graphite-dev
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~graphite-dev
More help   : https://help.launchpad.net/ListHelp

Reply via email to