http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-client/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-client/src/py/storm/ttypes.py 
b/storm-client/src/py/storm/ttypes.py
index 9e6725c..5d942b2 100644
--- a/storm-client/src/py/storm/ttypes.py
+++ b/storm-client/src/py/storm/ttypes.py
@@ -9452,6 +9452,8 @@ class WorkerResources:
    - cpu
    - shared_mem_on_heap
    - shared_mem_off_heap
+   - resources
+   - shared_resources
   """
 
   thrift_spec = (
@@ -9461,14 +9463,18 @@ class WorkerResources:
     (3, TType.DOUBLE, 'cpu', None, None, ), # 3
     (4, TType.DOUBLE, 'shared_mem_on_heap', None, None, ), # 4
     (5, TType.DOUBLE, 'shared_mem_off_heap', None, None, ), # 5
+    (6, TType.MAP, 'resources', (TType.STRING,None,TType.DOUBLE,None), None, 
), # 6
+    (7, TType.MAP, 'shared_resources', (TType.STRING,None,TType.DOUBLE,None), 
None, ), # 7
   )
 
-  def __init__(self, mem_on_heap=None, mem_off_heap=None, cpu=None, 
shared_mem_on_heap=None, shared_mem_off_heap=None,):
+  def __init__(self, mem_on_heap=None, mem_off_heap=None, cpu=None, 
shared_mem_on_heap=None, shared_mem_off_heap=None, resources=None, 
shared_resources=None,):
     self.mem_on_heap = mem_on_heap
     self.mem_off_heap = mem_off_heap
     self.cpu = cpu
     self.shared_mem_on_heap = shared_mem_on_heap
     self.shared_mem_off_heap = shared_mem_off_heap
+    self.resources = resources
+    self.shared_resources = shared_resources
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
@@ -9504,6 +9510,28 @@ class WorkerResources:
           self.shared_mem_off_heap = iprot.readDouble()
         else:
           iprot.skip(ftype)
+      elif fid == 6:
+        if ftype == TType.MAP:
+          self.resources = {}
+          (_ktype596, _vtype597, _size595 ) = iprot.readMapBegin()
+          for _i599 in xrange(_size595):
+            _key600 = iprot.readString().decode('utf-8')
+            _val601 = iprot.readDouble()
+            self.resources[_key600] = _val601
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.MAP:
+          self.shared_resources = {}
+          (_ktype603, _vtype604, _size602 ) = iprot.readMapBegin()
+          for _i606 in xrange(_size602):
+            _key607 = iprot.readString().decode('utf-8')
+            _val608 = iprot.readDouble()
+            self.shared_resources[_key607] = _val608
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -9534,6 +9562,22 @@ class WorkerResources:
       oprot.writeFieldBegin('shared_mem_off_heap', TType.DOUBLE, 5)
       oprot.writeDouble(self.shared_mem_off_heap)
       oprot.writeFieldEnd()
+    if self.resources is not None:
+      oprot.writeFieldBegin('resources', TType.MAP, 6)
+      oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.resources))
+      for kiter609,viter610 in self.resources.items():
+        oprot.writeString(kiter609.encode('utf-8'))
+        oprot.writeDouble(viter610)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    if self.shared_resources is not None:
+      oprot.writeFieldBegin('shared_resources', TType.MAP, 7)
+      oprot.writeMapBegin(TType.STRING, TType.DOUBLE, 
len(self.shared_resources))
+      for kiter611,viter612 in self.shared_resources.items():
+        oprot.writeString(kiter611.encode('utf-8'))
+        oprot.writeDouble(viter612)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -9548,6 +9592,8 @@ class WorkerResources:
     value = (value * 31) ^ hash(self.cpu)
     value = (value * 31) ^ hash(self.shared_mem_on_heap)
     value = (value * 31) ^ hash(self.shared_mem_off_heap)
+    value = (value * 31) ^ hash(self.resources)
+    value = (value * 31) ^ hash(self.shared_resources)
     return value
 
   def __repr__(self):
@@ -9630,68 +9676,68 @@ class Assignment:
       elif fid == 2:
         if ftype == TType.MAP:
           self.node_host = {}
-          (_ktype596, _vtype597, _size595 ) = iprot.readMapBegin()
-          for _i599 in xrange(_size595):
-            _key600 = iprot.readString().decode('utf-8')
-            _val601 = iprot.readString().decode('utf-8')
-            self.node_host[_key600] = _val601
+          (_ktype614, _vtype615, _size613 ) = iprot.readMapBegin()
+          for _i617 in xrange(_size613):
+            _key618 = iprot.readString().decode('utf-8')
+            _val619 = iprot.readString().decode('utf-8')
+            self.node_host[_key618] = _val619
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.MAP:
           self.executor_node_port = {}
-          (_ktype603, _vtype604, _size602 ) = iprot.readMapBegin()
-          for _i606 in xrange(_size602):
-            _key607 = []
-            (_etype612, _size609) = iprot.readListBegin()
-            for _i613 in xrange(_size609):
-              _elem614 = iprot.readI64()
-              _key607.append(_elem614)
+          (_ktype621, _vtype622, _size620 ) = iprot.readMapBegin()
+          for _i624 in xrange(_size620):
+            _key625 = []
+            (_etype630, _size627) = iprot.readListBegin()
+            for _i631 in xrange(_size627):
+              _elem632 = iprot.readI64()
+              _key625.append(_elem632)
             iprot.readListEnd()
-            _val608 = NodeInfo()
-            _val608.read(iprot)
-            self.executor_node_port[_key607] = _val608
+            _val626 = NodeInfo()
+            _val626.read(iprot)
+            self.executor_node_port[_key625] = _val626
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.MAP:
           self.executor_start_time_secs = {}
-          (_ktype616, _vtype617, _size615 ) = iprot.readMapBegin()
-          for _i619 in xrange(_size615):
-            _key620 = []
-            (_etype625, _size622) = iprot.readListBegin()
-            for _i626 in xrange(_size622):
-              _elem627 = iprot.readI64()
-              _key620.append(_elem627)
+          (_ktype634, _vtype635, _size633 ) = iprot.readMapBegin()
+          for _i637 in xrange(_size633):
+            _key638 = []
+            (_etype643, _size640) = iprot.readListBegin()
+            for _i644 in xrange(_size640):
+              _elem645 = iprot.readI64()
+              _key638.append(_elem645)
             iprot.readListEnd()
-            _val621 = iprot.readI64()
-            self.executor_start_time_secs[_key620] = _val621
+            _val639 = iprot.readI64()
+            self.executor_start_time_secs[_key638] = _val639
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.MAP:
           self.worker_resources = {}
-          (_ktype629, _vtype630, _size628 ) = iprot.readMapBegin()
-          for _i632 in xrange(_size628):
-            _key633 = NodeInfo()
-            _key633.read(iprot)
-            _val634 = WorkerResources()
-            _val634.read(iprot)
-            self.worker_resources[_key633] = _val634
+          (_ktype647, _vtype648, _size646 ) = iprot.readMapBegin()
+          for _i650 in xrange(_size646):
+            _key651 = NodeInfo()
+            _key651.read(iprot)
+            _val652 = WorkerResources()
+            _val652.read(iprot)
+            self.worker_resources[_key651] = _val652
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 6:
         if ftype == TType.MAP:
           self.total_shared_off_heap = {}
-          (_ktype636, _vtype637, _size635 ) = iprot.readMapBegin()
-          for _i639 in xrange(_size635):
-            _key640 = iprot.readString().decode('utf-8')
-            _val641 = iprot.readDouble()
-            self.total_shared_off_heap[_key640] = _val641
+          (_ktype654, _vtype655, _size653 ) = iprot.readMapBegin()
+          for _i657 in xrange(_size653):
+            _key658 = iprot.readString().decode('utf-8')
+            _val659 = iprot.readDouble()
+            self.total_shared_off_heap[_key658] = _val659
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -9717,47 +9763,47 @@ class Assignment:
     if self.node_host is not None:
       oprot.writeFieldBegin('node_host', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host))
-      for kiter642,viter643 in self.node_host.items():
-        oprot.writeString(kiter642.encode('utf-8'))
-        oprot.writeString(viter643.encode('utf-8'))
+      for kiter660,viter661 in self.node_host.items():
+        oprot.writeString(kiter660.encode('utf-8'))
+        oprot.writeString(viter661.encode('utf-8'))
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.executor_node_port is not None:
       oprot.writeFieldBegin('executor_node_port', TType.MAP, 3)
       oprot.writeMapBegin(TType.LIST, TType.STRUCT, 
len(self.executor_node_port))
-      for kiter644,viter645 in self.executor_node_port.items():
-        oprot.writeListBegin(TType.I64, len(kiter644))
-        for iter646 in kiter644:
-          oprot.writeI64(iter646)
+      for kiter662,viter663 in self.executor_node_port.items():
+        oprot.writeListBegin(TType.I64, len(kiter662))
+        for iter664 in kiter662:
+          oprot.writeI64(iter664)
         oprot.writeListEnd()
-        viter645.write(oprot)
+        viter663.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.executor_start_time_secs is not None:
       oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4)
       oprot.writeMapBegin(TType.LIST, TType.I64, 
len(self.executor_start_time_secs))
-      for kiter647,viter648 in self.executor_start_time_secs.items():
-        oprot.writeListBegin(TType.I64, len(kiter647))
-        for iter649 in kiter647:
-          oprot.writeI64(iter649)
+      for kiter665,viter666 in self.executor_start_time_secs.items():
+        oprot.writeListBegin(TType.I64, len(kiter665))
+        for iter667 in kiter665:
+          oprot.writeI64(iter667)
         oprot.writeListEnd()
-        oprot.writeI64(viter648)
+        oprot.writeI64(viter666)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.worker_resources is not None:
       oprot.writeFieldBegin('worker_resources', TType.MAP, 5)
       oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, 
len(self.worker_resources))
-      for kiter650,viter651 in self.worker_resources.items():
-        kiter650.write(oprot)
-        viter651.write(oprot)
+      for kiter668,viter669 in self.worker_resources.items():
+        kiter668.write(oprot)
+        viter669.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.total_shared_off_heap is not None:
       oprot.writeFieldBegin('total_shared_off_heap', TType.MAP, 6)
       oprot.writeMapBegin(TType.STRING, TType.DOUBLE, 
len(self.total_shared_off_heap))
-      for kiter652,viter653 in self.total_shared_off_heap.items():
-        oprot.writeString(kiter652.encode('utf-8'))
-        oprot.writeDouble(viter653)
+      for kiter670,viter671 in self.total_shared_off_heap.items():
+        oprot.writeString(kiter670.encode('utf-8'))
+        oprot.writeDouble(viter671)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.owner is not None:
@@ -9946,11 +9992,11 @@ class StormBase:
       elif fid == 4:
         if ftype == TType.MAP:
           self.component_executors = {}
-          (_ktype655, _vtype656, _size654 ) = iprot.readMapBegin()
-          for _i658 in xrange(_size654):
-            _key659 = iprot.readString().decode('utf-8')
-            _val660 = iprot.readI32()
-            self.component_executors[_key659] = _val660
+          (_ktype673, _vtype674, _size672 ) = iprot.readMapBegin()
+          for _i676 in xrange(_size672):
+            _key677 = iprot.readString().decode('utf-8')
+            _val678 = iprot.readI32()
+            self.component_executors[_key677] = _val678
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -9978,12 +10024,12 @@ class StormBase:
       elif fid == 9:
         if ftype == TType.MAP:
           self.component_debug = {}
-          (_ktype662, _vtype663, _size661 ) = iprot.readMapBegin()
-          for _i665 in xrange(_size661):
-            _key666 = iprot.readString().decode('utf-8')
-            _val667 = DebugOptions()
-            _val667.read(iprot)
-            self.component_debug[_key666] = _val667
+          (_ktype680, _vtype681, _size679 ) = iprot.readMapBegin()
+          for _i683 in xrange(_size679):
+            _key684 = iprot.readString().decode('utf-8')
+            _val685 = DebugOptions()
+            _val685.read(iprot)
+            self.component_debug[_key684] = _val685
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -10022,9 +10068,9 @@ class StormBase:
     if self.component_executors is not None:
       oprot.writeFieldBegin('component_executors', TType.MAP, 4)
       oprot.writeMapBegin(TType.STRING, TType.I32, 
len(self.component_executors))
-      for kiter668,viter669 in self.component_executors.items():
-        oprot.writeString(kiter668.encode('utf-8'))
-        oprot.writeI32(viter669)
+      for kiter686,viter687 in self.component_executors.items():
+        oprot.writeString(kiter686.encode('utf-8'))
+        oprot.writeI32(viter687)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.launch_time_secs is not None:
@@ -10046,9 +10092,9 @@ class StormBase:
     if self.component_debug is not None:
       oprot.writeFieldBegin('component_debug', TType.MAP, 9)
       oprot.writeMapBegin(TType.STRING, TType.STRUCT, 
len(self.component_debug))
-      for kiter670,viter671 in self.component_debug.items():
-        oprot.writeString(kiter670.encode('utf-8'))
-        viter671.write(oprot)
+      for kiter688,viter689 in self.component_debug.items():
+        oprot.writeString(kiter688.encode('utf-8'))
+        viter689.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.principal is not None:
@@ -10138,13 +10184,13 @@ class ClusterWorkerHeartbeat:
       elif fid == 2:
         if ftype == TType.MAP:
           self.executor_stats = {}
-          (_ktype673, _vtype674, _size672 ) = iprot.readMapBegin()
-          for _i676 in xrange(_size672):
-            _key677 = ExecutorInfo()
-            _key677.read(iprot)
-            _val678 = ExecutorStats()
-            _val678.read(iprot)
-            self.executor_stats[_key677] = _val678
+          (_ktype691, _vtype692, _size690 ) = iprot.readMapBegin()
+          for _i694 in xrange(_size690):
+            _key695 = ExecutorInfo()
+            _key695.read(iprot)
+            _val696 = ExecutorStats()
+            _val696.read(iprot)
+            self.executor_stats[_key695] = _val696
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -10175,9 +10221,9 @@ class ClusterWorkerHeartbeat:
     if self.executor_stats is not None:
       oprot.writeFieldBegin('executor_stats', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats))
-      for kiter679,viter680 in self.executor_stats.items():
-        kiter679.write(oprot)
-        viter680.write(oprot)
+      for kiter697,viter698 in self.executor_stats.items():
+        kiter697.write(oprot)
+        viter698.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.time_secs is not None:
@@ -10330,12 +10376,12 @@ class LocalStateData:
       if fid == 1:
         if ftype == TType.MAP:
           self.serialized_parts = {}
-          (_ktype682, _vtype683, _size681 ) = iprot.readMapBegin()
-          for _i685 in xrange(_size681):
-            _key686 = iprot.readString().decode('utf-8')
-            _val687 = ThriftSerializedObject()
-            _val687.read(iprot)
-            self.serialized_parts[_key686] = _val687
+          (_ktype700, _vtype701, _size699 ) = iprot.readMapBegin()
+          for _i703 in xrange(_size699):
+            _key704 = iprot.readString().decode('utf-8')
+            _val705 = ThriftSerializedObject()
+            _val705.read(iprot)
+            self.serialized_parts[_key704] = _val705
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -10352,9 +10398,9 @@ class LocalStateData:
     if self.serialized_parts is not None:
       oprot.writeFieldBegin('serialized_parts', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.STRUCT, 
len(self.serialized_parts))
-      for kiter688,viter689 in self.serialized_parts.items():
-        oprot.writeString(kiter688.encode('utf-8'))
-        viter689.write(oprot)
+      for kiter706,viter707 in self.serialized_parts.items():
+        oprot.writeString(kiter706.encode('utf-8'))
+        viter707.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -10425,11 +10471,11 @@ class LocalAssignment:
       elif fid == 2:
         if ftype == TType.LIST:
           self.executors = []
-          (_etype693, _size690) = iprot.readListBegin()
-          for _i694 in xrange(_size690):
-            _elem695 = ExecutorInfo()
-            _elem695.read(iprot)
-            self.executors.append(_elem695)
+          (_etype711, _size708) = iprot.readListBegin()
+          for _i712 in xrange(_size708):
+            _elem713 = ExecutorInfo()
+            _elem713.read(iprot)
+            self.executors.append(_elem713)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10466,8 +10512,8 @@ class LocalAssignment:
     if self.executors is not None:
       oprot.writeFieldBegin('executors', TType.LIST, 2)
       oprot.writeListBegin(TType.STRUCT, len(self.executors))
-      for iter696 in self.executors:
-        iter696.write(oprot)
+      for iter714 in self.executors:
+        iter714.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.resources is not None:
@@ -10606,11 +10652,11 @@ class LSApprovedWorkers:
       if fid == 1:
         if ftype == TType.MAP:
           self.approved_workers = {}
-          (_ktype698, _vtype699, _size697 ) = iprot.readMapBegin()
-          for _i701 in xrange(_size697):
-            _key702 = iprot.readString().decode('utf-8')
-            _val703 = iprot.readI32()
-            self.approved_workers[_key702] = _val703
+          (_ktype716, _vtype717, _size715 ) = iprot.readMapBegin()
+          for _i719 in xrange(_size715):
+            _key720 = iprot.readString().decode('utf-8')
+            _val721 = iprot.readI32()
+            self.approved_workers[_key720] = _val721
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -10627,9 +10673,9 @@ class LSApprovedWorkers:
     if self.approved_workers is not None:
       oprot.writeFieldBegin('approved_workers', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.I32, len(self.approved_workers))
-      for kiter704,viter705 in self.approved_workers.items():
-        oprot.writeString(kiter704.encode('utf-8'))
-        oprot.writeI32(viter705)
+      for kiter722,viter723 in self.approved_workers.items():
+        oprot.writeString(kiter722.encode('utf-8'))
+        oprot.writeI32(viter723)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -10683,12 +10729,12 @@ class LSSupervisorAssignments:
       if fid == 1:
         if ftype == TType.MAP:
           self.assignments = {}
-          (_ktype707, _vtype708, _size706 ) = iprot.readMapBegin()
-          for _i710 in xrange(_size706):
-            _key711 = iprot.readI32()
-            _val712 = LocalAssignment()
-            _val712.read(iprot)
-            self.assignments[_key711] = _val712
+          (_ktype725, _vtype726, _size724 ) = iprot.readMapBegin()
+          for _i728 in xrange(_size724):
+            _key729 = iprot.readI32()
+            _val730 = LocalAssignment()
+            _val730.read(iprot)
+            self.assignments[_key729] = _val730
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -10705,9 +10751,9 @@ class LSSupervisorAssignments:
     if self.assignments is not None:
       oprot.writeFieldBegin('assignments', TType.MAP, 1)
       oprot.writeMapBegin(TType.I32, TType.STRUCT, len(self.assignments))
-      for kiter713,viter714 in self.assignments.items():
-        oprot.writeI32(kiter713)
-        viter714.write(oprot)
+      for kiter731,viter732 in self.assignments.items():
+        oprot.writeI32(kiter731)
+        viter732.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -10780,11 +10826,11 @@ class LSWorkerHeartbeat:
       elif fid == 3:
         if ftype == TType.LIST:
           self.executors = []
-          (_etype718, _size715) = iprot.readListBegin()
-          for _i719 in xrange(_size715):
-            _elem720 = ExecutorInfo()
-            _elem720.read(iprot)
-            self.executors.append(_elem720)
+          (_etype736, _size733) = iprot.readListBegin()
+          for _i737 in xrange(_size733):
+            _elem738 = ExecutorInfo()
+            _elem738.read(iprot)
+            self.executors.append(_elem738)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10814,8 +10860,8 @@ class LSWorkerHeartbeat:
     if self.executors is not None:
       oprot.writeFieldBegin('executors', TType.LIST, 3)
       oprot.writeListBegin(TType.STRUCT, len(self.executors))
-      for iter721 in self.executors:
-        iter721.write(oprot)
+      for iter739 in self.executors:
+        iter739.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.port is not None:
@@ -10901,20 +10947,20 @@ class LSTopoHistory:
       elif fid == 3:
         if ftype == TType.LIST:
           self.users = []
-          (_etype725, _size722) = iprot.readListBegin()
-          for _i726 in xrange(_size722):
-            _elem727 = iprot.readString().decode('utf-8')
-            self.users.append(_elem727)
+          (_etype743, _size740) = iprot.readListBegin()
+          for _i744 in xrange(_size740):
+            _elem745 = iprot.readString().decode('utf-8')
+            self.users.append(_elem745)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.LIST:
           self.groups = []
-          (_etype731, _size728) = iprot.readListBegin()
-          for _i732 in xrange(_size728):
-            _elem733 = iprot.readString().decode('utf-8')
-            self.groups.append(_elem733)
+          (_etype749, _size746) = iprot.readListBegin()
+          for _i750 in xrange(_size746):
+            _elem751 = iprot.readString().decode('utf-8')
+            self.groups.append(_elem751)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10939,15 +10985,15 @@ class LSTopoHistory:
     if self.users is not None:
       oprot.writeFieldBegin('users', TType.LIST, 3)
       oprot.writeListBegin(TType.STRING, len(self.users))
-      for iter734 in self.users:
-        oprot.writeString(iter734.encode('utf-8'))
+      for iter752 in self.users:
+        oprot.writeString(iter752.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.groups is not None:
       oprot.writeFieldBegin('groups', TType.LIST, 4)
       oprot.writeListBegin(TType.STRING, len(self.groups))
-      for iter735 in self.groups:
-        oprot.writeString(iter735.encode('utf-8'))
+      for iter753 in self.groups:
+        oprot.writeString(iter753.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11010,11 +11056,11 @@ class LSTopoHistoryList:
       if fid == 1:
         if ftype == TType.LIST:
           self.topo_history = []
-          (_etype739, _size736) = iprot.readListBegin()
-          for _i740 in xrange(_size736):
-            _elem741 = LSTopoHistory()
-            _elem741.read(iprot)
-            self.topo_history.append(_elem741)
+          (_etype757, _size754) = iprot.readListBegin()
+          for _i758 in xrange(_size754):
+            _elem759 = LSTopoHistory()
+            _elem759.read(iprot)
+            self.topo_history.append(_elem759)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11031,8 +11077,8 @@ class LSTopoHistoryList:
     if self.topo_history is not None:
       oprot.writeFieldBegin('topo_history', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.topo_history))
-      for iter742 in self.topo_history:
-        iter742.write(oprot)
+      for iter760 in self.topo_history:
+        iter760.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11367,12 +11413,12 @@ class LogConfig:
       if fid == 2:
         if ftype == TType.MAP:
           self.named_logger_level = {}
-          (_ktype744, _vtype745, _size743 ) = iprot.readMapBegin()
-          for _i747 in xrange(_size743):
-            _key748 = iprot.readString().decode('utf-8')
-            _val749 = LogLevel()
-            _val749.read(iprot)
-            self.named_logger_level[_key748] = _val749
+          (_ktype762, _vtype763, _size761 ) = iprot.readMapBegin()
+          for _i765 in xrange(_size761):
+            _key766 = iprot.readString().decode('utf-8')
+            _val767 = LogLevel()
+            _val767.read(iprot)
+            self.named_logger_level[_key766] = _val767
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -11389,9 +11435,9 @@ class LogConfig:
     if self.named_logger_level is not None:
       oprot.writeFieldBegin('named_logger_level', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRING, TType.STRUCT, 
len(self.named_logger_level))
-      for kiter750,viter751 in self.named_logger_level.items():
-        oprot.writeString(kiter750.encode('utf-8'))
-        viter751.write(oprot)
+      for kiter768,viter769 in self.named_logger_level.items():
+        oprot.writeString(kiter768.encode('utf-8'))
+        viter769.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11443,10 +11489,10 @@ class TopologyHistoryInfo:
       if fid == 1:
         if ftype == TType.LIST:
           self.topo_ids = []
-          (_etype755, _size752) = iprot.readListBegin()
-          for _i756 in xrange(_size752):
-            _elem757 = iprot.readString().decode('utf-8')
-            self.topo_ids.append(_elem757)
+          (_etype773, _size770) = iprot.readListBegin()
+          for _i774 in xrange(_size770):
+            _elem775 = iprot.readString().decode('utf-8')
+            self.topo_ids.append(_elem775)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11463,8 +11509,8 @@ class TopologyHistoryInfo:
     if self.topo_ids is not None:
       oprot.writeFieldBegin('topo_ids', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.topo_ids))
-      for iter758 in self.topo_ids:
-        oprot.writeString(iter758.encode('utf-8'))
+      for iter776 in self.topo_ids:
+        oprot.writeString(iter776.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11778,6 +11824,335 @@ class OwnerResourceSummary:
   def __ne__(self, other):
     return not (self == other)
 
+class WorkerMetricPoint:
+  """
+  Attributes:
+   - metricName
+   - timestamp
+   - metricValue
+   - componentId
+   - executorId
+   - streamId
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'metricName', None, None, ), # 1
+    (2, TType.I64, 'timestamp', None, None, ), # 2
+    (3, TType.DOUBLE, 'metricValue', None, None, ), # 3
+    (4, TType.STRING, 'componentId', None, None, ), # 4
+    (5, TType.STRING, 'executorId', None, None, ), # 5
+    (6, TType.STRING, 'streamId', None, None, ), # 6
+  )
+
+  def __init__(self, metricName=None, timestamp=None, metricValue=None, 
componentId=None, executorId=None, streamId=None,):
+    self.metricName = metricName
+    self.timestamp = timestamp
+    self.metricValue = metricValue
+    self.componentId = componentId
+    self.executorId = executorId
+    self.streamId = streamId
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.metricName = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I64:
+          self.timestamp = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.DOUBLE:
+          self.metricValue = iprot.readDouble()
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.STRING:
+          self.componentId = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.STRING:
+          self.executorId = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 6:
+        if ftype == TType.STRING:
+          self.streamId = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('WorkerMetricPoint')
+    if self.metricName is not None:
+      oprot.writeFieldBegin('metricName', TType.STRING, 1)
+      oprot.writeString(self.metricName.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.timestamp is not None:
+      oprot.writeFieldBegin('timestamp', TType.I64, 2)
+      oprot.writeI64(self.timestamp)
+      oprot.writeFieldEnd()
+    if self.metricValue is not None:
+      oprot.writeFieldBegin('metricValue', TType.DOUBLE, 3)
+      oprot.writeDouble(self.metricValue)
+      oprot.writeFieldEnd()
+    if self.componentId is not None:
+      oprot.writeFieldBegin('componentId', TType.STRING, 4)
+      oprot.writeString(self.componentId.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.executorId is not None:
+      oprot.writeFieldBegin('executorId', TType.STRING, 5)
+      oprot.writeString(self.executorId.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.streamId is not None:
+      oprot.writeFieldBegin('streamId', TType.STRING, 6)
+      oprot.writeString(self.streamId.encode('utf-8'))
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.metricName is None:
+      raise TProtocol.TProtocolException(message='Required field metricName is 
unset!')
+    if self.timestamp is None:
+      raise TProtocol.TProtocolException(message='Required field timestamp is 
unset!')
+    if self.metricValue is None:
+      raise TProtocol.TProtocolException(message='Required field metricValue 
is unset!')
+    if self.componentId is None:
+      raise TProtocol.TProtocolException(message='Required field componentId 
is unset!')
+    if self.executorId is None:
+      raise TProtocol.TProtocolException(message='Required field executorId is 
unset!')
+    if self.streamId is None:
+      raise TProtocol.TProtocolException(message='Required field streamId is 
unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.metricName)
+    value = (value * 31) ^ hash(self.timestamp)
+    value = (value * 31) ^ hash(self.metricValue)
+    value = (value * 31) ^ hash(self.componentId)
+    value = (value * 31) ^ hash(self.executorId)
+    value = (value * 31) ^ hash(self.streamId)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class WorkerMetricList:
+  """
+  Attributes:
+   - metrics
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.LIST, 'metrics', (TType.STRUCT,(WorkerMetricPoint, 
WorkerMetricPoint.thrift_spec)), None, ), # 1
+  )
+
+  def __init__(self, metrics=None,):
+    self.metrics = metrics
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.LIST:
+          self.metrics = []
+          (_etype780, _size777) = iprot.readListBegin()
+          for _i781 in xrange(_size777):
+            _elem782 = WorkerMetricPoint()
+            _elem782.read(iprot)
+            self.metrics.append(_elem782)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('WorkerMetricList')
+    if self.metrics is not None:
+      oprot.writeFieldBegin('metrics', TType.LIST, 1)
+      oprot.writeListBegin(TType.STRUCT, len(self.metrics))
+      for iter783 in self.metrics:
+        iter783.write(oprot)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.metrics)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class WorkerMetrics:
+  """
+  Attributes:
+   - topologyId
+   - port
+   - hostname
+   - metricList
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'topologyId', None, None, ), # 1
+    (2, TType.I32, 'port', None, None, ), # 2
+    (3, TType.STRING, 'hostname', None, None, ), # 3
+    (4, TType.STRUCT, 'metricList', (WorkerMetricList, 
WorkerMetricList.thrift_spec), None, ), # 4
+  )
+
+  def __init__(self, topologyId=None, port=None, hostname=None, 
metricList=None,):
+    self.topologyId = topologyId
+    self.port = port
+    self.hostname = hostname
+    self.metricList = metricList
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is 
not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, 
self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.topologyId = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I32:
+          self.port = iprot.readI32()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.STRING:
+          self.hostname = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.STRUCT:
+          self.metricList = WorkerMetricList()
+          self.metricList.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and 
self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, 
self.thrift_spec)))
+      return
+    oprot.writeStructBegin('WorkerMetrics')
+    if self.topologyId is not None:
+      oprot.writeFieldBegin('topologyId', TType.STRING, 1)
+      oprot.writeString(self.topologyId.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.port is not None:
+      oprot.writeFieldBegin('port', TType.I32, 2)
+      oprot.writeI32(self.port)
+      oprot.writeFieldEnd()
+    if self.hostname is not None:
+      oprot.writeFieldBegin('hostname', TType.STRING, 3)
+      oprot.writeString(self.hostname.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.metricList is not None:
+      oprot.writeFieldBegin('metricList', TType.STRUCT, 4)
+      self.metricList.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.topologyId is None:
+      raise TProtocol.TProtocolException(message='Required field topologyId is 
unset!')
+    if self.port is None:
+      raise TProtocol.TProtocolException(message='Required field port is 
unset!')
+    if self.hostname is None:
+      raise TProtocol.TProtocolException(message='Required field hostname is 
unset!')
+    if self.metricList is None:
+      raise TProtocol.TProtocolException(message='Required field metricList is 
unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.topologyId)
+    value = (value * 31) ^ hash(self.port)
+    value = (value * 31) ^ hash(self.hostname)
+    value = (value * 31) ^ hash(self.metricList)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class DRPCRequest:
   """
   Attributes:
@@ -12049,11 +12424,11 @@ class HBRecords:
       if fid == 1:
         if ftype == TType.LIST:
           self.pulses = []
-          (_etype762, _size759) = iprot.readListBegin()
-          for _i763 in xrange(_size759):
-            _elem764 = HBPulse()
-            _elem764.read(iprot)
-            self.pulses.append(_elem764)
+          (_etype787, _size784) = iprot.readListBegin()
+          for _i788 in xrange(_size784):
+            _elem789 = HBPulse()
+            _elem789.read(iprot)
+            self.pulses.append(_elem789)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12070,8 +12445,8 @@ class HBRecords:
     if self.pulses is not None:
       oprot.writeFieldBegin('pulses', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.pulses))
-      for iter765 in self.pulses:
-        iter765.write(oprot)
+      for iter790 in self.pulses:
+        iter790.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12123,10 +12498,10 @@ class HBNodes:
       if fid == 1:
         if ftype == TType.LIST:
           self.pulseIds = []
-          (_etype769, _size766) = iprot.readListBegin()
-          for _i770 in xrange(_size766):
-            _elem771 = iprot.readString().decode('utf-8')
-            self.pulseIds.append(_elem771)
+          (_etype794, _size791) = iprot.readListBegin()
+          for _i795 in xrange(_size791):
+            _elem796 = iprot.readString().decode('utf-8')
+            self.pulseIds.append(_elem796)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12143,8 +12518,8 @@ class HBNodes:
     if self.pulseIds is not None:
       oprot.writeFieldBegin('pulseIds', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.pulseIds))
-      for iter772 in self.pulseIds:
-        oprot.writeString(iter772.encode('utf-8'))
+      for iter797 in self.pulseIds:
+        oprot.writeString(iter797.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-client/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift
index aff7507..c5140f3 100644
--- a/storm-client/src/storm.thrift
+++ b/storm-client/src/storm.thrift
@@ -672,6 +672,26 @@ struct OwnerResourceSummary {
   18: optional double assigned_off_heap_memory;
 }
 
+struct WorkerMetricPoint {
+  1: required string metricName;
+  2: required i64 timestamp;
+  3: required double metricValue;
+  4: required string componentId;
+  5: required string executorId;
+  6: required string streamId;
+}
+
+struct WorkerMetricList {
+  1: list<WorkerMetricPoint> metrics;
+}
+
+struct WorkerMetrics {
+  1: required string topologyId;
+  2: required i32 port;
+  3: required string hostname;
+  4: required WorkerMetricList metricList;
+}
+
 service Nimbus {
   void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string 
jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: 
InvalidTopologyException ite, 3: AuthorizationException aze);
   void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 
3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws 
(1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: 
AuthorizationException aze);
@@ -748,6 +768,7 @@ service Nimbus {
   StormTopology getUserTopology(1: string id) throws (1: NotAliveException e, 
2: AuthorizationException aze);
   TopologyHistoryInfo getTopologyHistory(1: string user) throws (1: 
AuthorizationException aze);
   list<OwnerResourceSummary> getOwnerResourceSummaries (1: string owner) 
throws (1: AuthorizationException aze);
+  void processWorkerMetrics(1: WorkerMetrics metrics);
 }
 
 struct DRPCRequest {
@@ -836,3 +857,5 @@ exception HBAuthorizationException {
 exception HBExecutionException {
   1: required string msg;
 }
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 7c301a4..cfe2d74 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -64,6 +64,10 @@
             <artifactId>auto-service</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+        </dependency>
 
         <!-- test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index f806562..a6881b7 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -19,10 +19,10 @@
 package org.apache.storm;
 
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isInteger;
+import static 
org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
 import static org.apache.storm.validation.ConfigValidationAnnotations.isString;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isStringList;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isStringOrStringList;
-import static 
org.apache.storm.validation.ConfigValidationAnnotations.isPositiveNumber;
 import static org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isListEntryCustom;
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isBoolean;
@@ -33,6 +33,7 @@ import static 
org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplic
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom;
 
 import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.metricstore.MetricStore;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.scheduler.blacklist.reporters.IReporter;
 import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
@@ -55,7 +56,7 @@ import java.util.Map;
 public class DaemonConfig implements Validated {
 
     /**
-     * We check with this interval that whether the Netty channel is writable 
and try to write pending messages
+     * We check with this interval that whether the Netty channel is writable 
and try to write pending messages.
      */
     @isInteger
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = 
"storm.messaging.netty.flush.check.interval.ms";
@@ -164,7 +165,7 @@ public class DaemonConfig implements Validated {
 
     /**
      * The time to allow any given healthcheck script to run before it
-     * is marked failed due to timeout
+     * is marked failed due to timeout.
      */
     @isNumber
     public static final String STORM_HEALTH_CHECK_TIMEOUT_MS = 
"storm.health.check.timeout.ms";
@@ -809,7 +810,7 @@ public class DaemonConfig implements Validated {
      * Enables user-first classpath. See topology.classpath.beginning.
      */
     @isBoolean
-    public static final String 
STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED="storm.topology.classpath.beginning.enabled";
+    public static final String STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED = 
"storm.topology.classpath.beginning.enabled";
 
     /**
      * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and 
Workers)
@@ -905,17 +906,17 @@ public class DaemonConfig implements Validated {
     public static final String NIMBUS_CODE_SYNC_FREQ_SECS = 
"nimbus.code.sync.freq.secs";
 
     /**
-     * The plugin to be used for resource isolation
+     * The plugin to be used for resource isolation.
      */
     @isImplementationOfClass(implementsClass = 
ResourceIsolationInterface.class)
     public static final String STORM_RESOURCE_ISOLATION_PLUGIN = 
"storm.resource.isolation.plugin";
 
     /**
-     * CGroup Setting below
+     * CGroup Setting below.
      */
 
     /**
-     * resources to to be controlled by cgroups
+     * resources to to be controlled by cgroups.
      */
     @isStringList
     public static final String STORM_CGROUP_RESOURCES = 
"storm.cgroup.resources";
@@ -1030,6 +1031,48 @@ public class DaemonConfig implements Validated {
     public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS =
         "storm.supervisor.medium.memory.grace.period.ms";
 
+    /**
+     * Class implementing MetricStore.
+     */
+    @NotNull
+    @isImplementationOfClass(implementsClass = MetricStore.class)
+    public static final String STORM_METRIC_STORE_CLASS = 
"storm.metricstore.class";
+
+    /**
+     * RocksDB file location. This setting is specific to the 
org.apache.storm.metricstore.rocksdb.RocksDbStore
+     * implementation for the storm.metricstore.class.
+     */
+    @isString
+    public static final String STORM_ROCKSDB_LOCATION = 
"storm.metricstore.rocksdb.location";
+
+    /**
+     * RocksDB create if missing flag. This setting is specific to the 
org.apache.storm.metricstore.rocksdb.RocksDbStore
+     * implementation for the storm.metricstore.class.
+     */
+    @isBoolean
+    public static final String STORM_ROCKSDB_CREATE_IF_MISSING = 
"storm.metricstore.rocksdb.create_if_missing";
+
+    /**
+     * RocksDB metadata cache capacity. This setting is specific to the 
org.apache.storm.metricstore.rocksdb.RocksDbStore
+     * implementation for the storm.metricstore.class.
+     */
+    @isInteger
+    public static final String STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY = 
"storm.metricstore.rocksdb.metadata_string_cache_capacity";
+
+    /**
+     * RocksDB setting for length of metric retention. This setting is 
specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore
+     * implementation for the storm.metricstore.class.
+     */
+    @isInteger
+    public static final String STORM_ROCKSDB_METRIC_RETENTION_HOURS = 
"storm.metricstore.rocksdb.retention_hours";
+
+    /**
+     * RocksDB setting for period of metric deletion thread. This setting is 
specific to the
+     * org.apache.storm.metricstore.rocksdb.RocksDbStore implementation for 
the storm.metricstore.class.
+     */
+    @isInteger
+    public static final String STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS = 
"storm.metricstore.rocksdb.deletion_period_hours";
+
     // VALIDATION ONLY CONFIGS
     // Some configs inside Config.java may reference classes we don't want to 
expose in storm-client, but we still want to validate
     // That they reference a valid class.  To allow this to happen we do part 
of the validation on the client side with annotations on
@@ -1051,7 +1094,7 @@ public class DaemonConfig implements Validated {
     }
 
     /**
-     * Get the cgroup resources from the conf
+     * Get the cgroup resources from the conf.
      *
      * @param conf the config to read
      * @return the resources.

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java 
b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index de3053e..502f454 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -77,6 +77,7 @@ import org.apache.storm.generated.SupervisorPageInfo;
 import org.apache.storm.generated.TopologyHistoryInfo;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologyPageInfo;
+import org.apache.storm.generated.WorkerMetrics;
 import org.apache.storm.messaging.IContext;
 import org.apache.storm.messaging.local.Context;
 import org.apache.storm.nimbus.ILeaderElector;
@@ -1123,7 +1124,12 @@ public class LocalCluster implements 
ILocalClusterTrackedTopologyAware, Iface {
         // TODO Auto-generated method stub
         throw new RuntimeException("NOT IMPLEMENTED YET");
     }
-    
+
+    @Override
+    public void processWorkerMetrics(WorkerMetrics metrics) throws 
org.apache.thrift.TException {
+        getNimbus().processWorkerMetrics(metrics);
+    }
+
     public static void main(final String [] args) throws Exception {
         if (args.length < 1) {
             throw new IllegalArgumentException("No class was specified to 
run");

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index afb1c28..37141e8 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -124,6 +124,8 @@ import org.apache.storm.generated.TopologyInitialStatus;
 import org.apache.storm.generated.TopologyPageInfo;
 import org.apache.storm.generated.TopologyStatus;
 import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerMetricPoint;
+import org.apache.storm.generated.WorkerMetrics;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.generated.WorkerSummary;
 import org.apache.storm.logging.ThriftAccessLogger;
@@ -132,6 +134,10 @@ import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.metric.api.DataPoint;
 import org.apache.storm.metric.api.IClusterMetricsConsumer;
 import org.apache.storm.metric.api.IClusterMetricsConsumer.ClusterInfo;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.metricstore.MetricStoreConfig;
 import org.apache.storm.nimbus.DefaultTopologyValidator;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
@@ -230,6 +236,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private static final Meter getOwnerResourceSummariesCalls = 
StormMetricsRegistry.registerMeter(
             "nimbus:num-getOwnerResourceSummaries-calls");
     private static final Meter shutdownCalls = 
StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls");
+    private static final Meter processWorkerMetricsCalls = 
StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
     // END Metrics
     
     private static final String STORM_VERSION = VersionInfo.getVersion();
@@ -336,7 +343,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         return Nimbus.make(base.get_prev_status());
     };
     
-    private static final Map<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>> TOPO_STATE_TRANSITIONS = 
+    private static final Map<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>> TOPO_STATE_TRANSITIONS =
             new ImmutableMap.Builder<TopologyStatus, Map<TopologyActions, 
TopologyStateTransition>>()
             .put(TopologyStatus.ACTIVE, new 
ImmutableMap.Builder<TopologyActions, TopologyStateTransition>()
                     .put(TopologyActions.INACTIVATE, INACTIVE_TRANSITION)
@@ -597,7 +604,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     
     /**
      * convert {topology-id -> SchedulerAssignment} to
-     *         {topology-id -> {executor [node port]}}
+     *         {topology-id -> {executor [node port]}}.
      * @return
      */
     private static Map<String, Map<List<Long>, List<Object>>> 
computeTopoToExecToNodePort(Map<String, SchedulerAssignment> schedAssignments) {
@@ -879,7 +886,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         }
     }
     
-    private static StormTopology tryReadTopology(String topoId, TopoCache tc) 
throws NotAliveException, AuthorizationException, IOException {
+    private static StormTopology tryReadTopology(String topoId, TopoCache tc)
+            throws NotAliveException, AuthorizationException, IOException {
         try {
             return readStormTopologyAsNimbus(topoId, tc);
         } catch (KeyNotFoundException e) {
@@ -1001,10 +1009,10 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         final Nimbus nimbus = new Nimbus(conf, inimbus);
         nimbus.launchServer();
         final ThriftServer server = new ThriftServer(conf, new 
Processor<>(nimbus), ThriftConnectionType.NIMBUS);
-        Utils.addShutdownHookWithForceKillIn1Sec(() -> {
+        Utils.addShutdownHookWithDelayedForceKill(() -> {
             nimbus.shutdown();
             server.stop();
-        });
+        }, 10);
         LOG.info("Starting nimbus server for storm version '{}'", 
STORM_VERSION);
         server.serve();
         return nimbus;
@@ -1022,6 +1030,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     }
     
     private final Map<String, Object> conf;
+    private MetricStore metricsStore;
     private final NavigableMap<SimpleVersion, List<String>> 
supervisorClasspaths;
     private final NimbusInfo nimbusHostPortInfo;
     private final INimbus inimbus;
@@ -1098,6 +1107,15 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             BlobStore blobStore, TopoCache topoCache, ILeaderElector 
leaderElector, IGroupMappingServiceProvider groupMapper)
         throws Exception {
         this.conf = conf;
+
+        this.metricsStore = null;
+        try {
+            this.metricsStore = MetricStoreConfig.configure(conf);
+        } catch (Exception e) {
+            // the metrics store is not critical to the operation of the 
cluster, allow Nimbus to come up
+            LOG.error("Failed to initialize metric store", e);
+        }
+
         if (hostPortInfo == null) {
             hostPortInfo = NimbusInfo.fromConf(conf);
         }
@@ -2730,7 +2748,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             
             // lock protects against multiple topologies being submitted at 
once and
             // cleanup thread killing topology in b/w assignment and starting 
the topology
-            synchronized(submitLock) {
+            synchronized (submitLock) {
                 assertTopoActive(topoName, false);
                 //cred-update-lock is not needed here because creds are being 
added for the first time.
                 if (creds != null) {
@@ -3790,7 +3808,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                     }
                     Map<WorkerSlot, WorkerResources> workerResources = 
getWorkerResourcesForTopology(topoId);
                     boolean isAllowed = userTopologies.contains(topoId);
-                    for (WorkerSummary workerSummary: 
StatsUtil.aggWorkerStats(topoId, topoName, taskToComp, beats, 
+                    for (WorkerSummary workerSummary: 
StatsUtil.aggWorkerStats(topoId, topoName, taskToComp, beats,
                             exec2NodePort, nodeToHost, workerResources, 
includeSys, isAllowed, sid)) {
                         pageInfo.add_to_worker_summaries(workerSummary);
                     }
@@ -3808,7 +3826,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     
     @Override
     public ComponentPageInfo getComponentPageInfo(String topoId, String 
componentId, String window,
-            boolean includeSys) throws NotAliveException, 
AuthorizationException, TException {
+                                                  boolean includeSys) throws 
NotAliveException, AuthorizationException, TException {
         try {
             getComponentPageInfoCalls.mark();
             CommonTopoInfo info = getCommonTopoInfo(topoId, 
"getComponentPageInfo");
@@ -4004,8 +4022,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         List<NimbusSummary> nimbuses = stormClusterState.nimbuses();
         NimbusInfo leader = leaderElector.getLeader();
         for (NimbusSummary nimbusSummary: nimbuses) {
-            if (leader.getHost().equals(nimbusSummary.get_host()) &&
-                    leader.getPort() == nimbusSummary.get_port()) {
+            if (leader.getHost().equals(nimbusSummary.get_host())
+                    && leader.getPort() == nimbusSummary.get_port()) {
                 
nimbusSummary.set_uptime_secs(Time.deltaSecs(nimbusSummary.get_uptime_secs()));
                 nimbusSummary.set_isLeader(true);
                 return nimbusSummary;
@@ -4042,7 +4060,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             //else, add only this owner (the input paramter) to the map
             Map<String, List<StormBase>> ownerToBasesMap = new HashMap<>();
 
-            if (owner == null){
+            if (owner == null) {
                 // add all the owners to the map
                 for (StormBase base: topoIdToBases.values()) {
                     String baseOwner = base.get_owner();
@@ -4174,6 +4192,9 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             }
             if (zkClient != null) {
                 zkClient.close();
+            }            
+            if (metricsStore != null) {
+                metricsStore.close();
             }
             LOG.info("Shut down master");
         } catch (Exception e) {
@@ -4187,4 +4208,27 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     public boolean isWaiting() {
         return timer.isTimerWaiting();
     }
+
+    @Override
+    public void processWorkerMetrics(WorkerMetrics metrics) throws 
org.apache.thrift.TException {
+        processWorkerMetricsCalls.mark();
+
+        checkAuthorization(null, null, "processWorkerMetrics");
+
+        if (this.metricsStore == null) {
+            return;
+        }
+
+        for (WorkerMetricPoint m : metrics.get_metricList().get_metrics()) {
+            try {
+                Metric metric = new Metric(m.get_metricName(), 
m.get_timestamp(), metrics.get_topologyId(),
+                        m.get_metricValue(), m.get_componentId(), 
m.get_executorId(), metrics.get_hostname(),
+                        m.get_streamId(), metrics.get_port(), 
AggLevel.AGG_LEVEL_NONE);
+                this.metricsStore.insert(metric);
+            } catch (Exception e) {
+                LOG.error("Failed to save metric", e);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index ff60a4c..f45ce25 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.daemon.supervisor;
 
 import java.io.BufferedReader;
@@ -39,11 +40,16 @@ import 
org.apache.storm.container.ResourceIsolationInterface;
 import org.apache.storm.generated.LSWorkerHeartbeat;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerMetricPoint;
+import org.apache.storm.generated.WorkerMetricList;
+import org.apache.storm.generated.WorkerMetrics;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.NimbusClient;
 import org.apache.storm.utils.ServerConfigUtils;
 import org.apache.storm.utils.ServerUtils;
-import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -53,6 +59,11 @@ import org.yaml.snakeyaml.Yaml;
  */
 public abstract class Container implements Killable {
     private static final Logger LOG = LoggerFactory.getLogger(Container.class);
+    private static final String MEMORY_USED_METRIC = "UsedMemory";
+    private static final String SYSTEM_COMPONENT_ID = "System";
+    private static final String INVALID_EXECUTOR_ID = "-1";
+    private static final String INVALID_STREAM_ID = "None";
+
     public static enum ContainerType {
         LAUNCH(false, false),
         RECOVER_FULL(true, false),
@@ -137,6 +148,7 @@ public abstract class Container implements Killable {
     protected final ResourceIsolationInterface _resourceIsolationManager;
     protected ContainerType _type;
     protected final boolean _symlinksDisabled;
+    private long lastMetricProcessTime = 0L;
 
     /**
      * Create a new Container.
@@ -209,7 +221,7 @@ public abstract class Container implements Killable {
     }
     
     /**
-     * Kill a given process
+     * Kill a given process.
      * @param pid the id of the process to kill
      * @throws IOException
      */
@@ -218,7 +230,7 @@ public abstract class Container implements Killable {
     }
     
     /**
-     * Kill a given process
+     * Kill a given process.
      * @param pid the id of the process to kill
      * @throws IOException
      */
@@ -259,7 +271,7 @@ public abstract class Container implements Killable {
     }
 
     /**
-     * Is a process alive and running?
+     * Is a process alive and running?.
      * @param pid the PID of the running process
      * @param user the user that is expected to own that process
      * @return true if it is, else false
@@ -381,7 +393,7 @@ public abstract class Container implements Killable {
     }
     
     /**
-     * Write out the file used by the log viewer to allow/reject log access
+     * Write out the file used by the log viewer to allow/reject log access.
      * @param user the user this is going to run as
      * @throws IOException on any error
      */
@@ -429,7 +441,7 @@ public abstract class Container implements Killable {
     }
     
     /**
-     * Create symlink from the containers directory/artifacts to the artifacts 
directory
+     * Create symlink from the containers directory/artifacts to the artifacts 
directory.
      * @throws IOException on any error
      */
     protected void createArtifactsLink() throws IOException {
@@ -693,4 +705,41 @@ public abstract class Container implements Killable {
     public String getWorkerId() {
         return _workerId;
     }
+
+    /**
+     * Send worker metrics to Nimbus.
+     */
+    void processMetrics() {
+        try {
+            if (_usedMemory.get(_port) != null) {
+                // Make sure we don't process too frequently.
+                long nextMetricProcessTime = this.lastMetricProcessTime + 60L 
* 1000L;
+                long currentTimeMsec = System.currentTimeMillis();
+                if (currentTimeMsec < nextMetricProcessTime) {
+                    return;
+                }
+
+                String hostname = Utils.hostname();
+
+                // create metric for memory
+                long timestamp = System.currentTimeMillis();
+                double value = _usedMemory.get(_port).memory;
+                WorkerMetricPoint workerMetric = new 
WorkerMetricPoint(MEMORY_USED_METRIC, timestamp, value, SYSTEM_COMPONENT_ID,
+                        INVALID_EXECUTOR_ID, INVALID_STREAM_ID);
+
+                WorkerMetricList metricList = new WorkerMetricList();
+                metricList.add_to_metrics(workerMetric);
+                WorkerMetrics metrics = new WorkerMetrics(_topologyId, _port, 
hostname, metricList);
+
+                try (NimbusClient client = 
NimbusClient.getConfiguredClient(_conf)) {
+                    client.getClient().processWorkerMetrics(metrics);
+                }
+
+                this.lastMetricProcessTime = currentTimeMsec;
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to process metrics", e);
+            this.lastMetricProcessTime = System.currentTimeMillis();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index cb41654..fe30c93 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -936,6 +936,9 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
             }
             dynamicState = dynamicState.withProfileActions(mod, modPending);
         }
+
+        dynamicState.container.processMetrics();
+
         Time.sleep(staticState.monitorFreqMs);
         return dynamicState;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java 
b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index c1e6121..28bba3e 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.metric;
 
 import com.codahale.metrics.*;
@@ -53,6 +54,10 @@ public class StormMetricsRegistry {
         return register(name, gauge);
     }
 
+    public static void registerProvidedGauge(final String name, Gauge gauge) {
+        register(name, gauge);
+    }
+
     public static Histogram registerHistogram(String name, Reservoir 
reservoir) {
         Histogram histogram = new Histogram(reservoir);
         return register(name, histogram);

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java 
b/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java
new file mode 100644
index 0000000..662a17c
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/AggLevel.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+/**
+ * Specifies the available timeframes for Metric aggregation.
+ */
+public enum AggLevel {
+    AGG_LEVEL_NONE(0),
+    AGG_LEVEL_1_MIN(1),
+    AGG_LEVEL_10_MIN(10),
+    AGG_LEVEL_60_MIN(60);
+
+    private final byte value;
+
+    AggLevel(int value) {
+        this.value = (byte)value;
+    }
+
+    public byte getValue() {
+        return this.value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java 
b/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java
new file mode 100644
index 0000000..7cfbfbe
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * FilterOptions provides a method to select various filtering options for 
doing a scan of the metrics database.
+ */
+public class FilterOptions {
+    private Set<AggLevel> aggLevels = null;
+    private long startTime = 0L;
+    private long endTime = -1L;
+    private String topologyId = null;
+    private String componentId = null;
+    private String metricName = null;
+    private String executorId = null;
+    private String hostId = null;
+    private Integer port = null;
+    private String streamId = null;
+
+    public FilterOptions() {
+    }
+
+    public void setTopologyId(String topoId) {
+        this.topologyId = topoId;
+    }
+
+    public String getTopologyId() {
+        return this.topologyId;
+    }
+
+    public void setComponentId(String component) {
+        this.componentId = component;
+    }
+
+    public String getComponentId() {
+        return this.componentId;
+    }
+
+    public void setStartTime(Long time) {
+        this.startTime = time;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setEndTime(Long time) {
+        this.endTime = time;
+    }
+
+    /**
+     *  Returns the end time if set, returns the current time otherwise.
+     */
+    public long getEndTime() {
+        if (this.endTime < 0L) {
+            this.endTime = System.currentTimeMillis();
+        }
+        return this.endTime;
+    }
+
+    public void setMetricName(String name) {
+        this.metricName = name;
+    }
+
+    public String getMetricName() {
+        return this.metricName;
+    }
+
+    public void setExecutorId(String id) {
+        this.executorId = id;
+    }
+
+    public String getExecutorId() {
+        return this.executorId;
+    }
+
+    public void setHostId(String id) {
+        this.hostId = id;
+    }
+
+    public String getHostId() {
+        return this.hostId;
+    }
+
+    public void setPort(Integer p) {
+        this.port = p;
+    }
+
+    public Integer getPort() {
+        return this.port;
+    }
+
+    public void setStreamId(String id) {
+        this.streamId = id;
+    }
+
+    public String getStreamId() {
+        return this.streamId;
+    }
+
+    /**
+     *  Add an aggregation level to search for.
+     */
+    public void addAggLevel(AggLevel level) {
+        if (this.aggLevels == null) {
+            this.aggLevels = new HashSet<>(1);
+        }
+        this.aggLevels.add(level);
+    }
+
+    /**
+     *  Set the aggregation levels to search for.
+     */
+    public void setAggLevels(Set<AggLevel> levels) throws MetricException {
+        this.aggLevels = levels;
+        if (this.aggLevels == null || this.aggLevels.isEmpty()) {
+            throw new MetricException("Cannot search for empty AggLevel");
+        }
+    }
+
+    /**
+     *  Get the aggregation levels to search for.
+     */
+    public Set<AggLevel> getAggLevels() {
+        if (this.aggLevels == null) {
+            // assume filter choices have been made and since no selection was 
made, all levels are valid
+            this.aggLevels = new HashSet<>(4);
+            aggLevels.add(AggLevel.AGG_LEVEL_NONE);
+            aggLevels.add(AggLevel.AGG_LEVEL_1_MIN);
+            aggLevels.add(AggLevel.AGG_LEVEL_10_MIN);
+            aggLevels.add(AggLevel.AGG_LEVEL_60_MIN);
+        }
+        return this.aggLevels;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java 
b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java
new file mode 100644
index 0000000..716ced0
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/Metric.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class containing metric values and all identifying fields to be stored in a 
MetricStore.
+ */
+public class Metric implements Comparable<Metric> {
+    private static final Logger LOG = LoggerFactory.getLogger(Metric.class);
+
+    // key fields
+    private String name;
+    private long timestamp;
+    private String topologyId;
+    private String componentId;
+    private String executorId;
+    private String hostname;
+    private String streamId;
+    private int port;
+    private AggLevel aggLevel = AggLevel.AGG_LEVEL_NONE;
+
+    // value fields
+    private double value;
+    private long count = 1;
+    private double min = 0.0;
+    private double max = 0.0;
+    private double sum = 0.0;
+
+
+    /**
+     *  Metric constructor.
+     */
+    public Metric(String name, Long timestamp, String topologyId, double 
value, String componentId, String executorId,
+                  String hostname, String streamId, int port, AggLevel 
aggLevel) throws MetricException {
+        this.name = name;
+        this.timestamp = timestamp;
+        this.topologyId = topologyId;
+        this.componentId = componentId;
+        this.executorId = executorId;
+        this.hostname = hostname;
+        this.streamId = streamId;
+        this.port = port;
+        this.setValue(value);
+        setAggLevel(aggLevel);
+    }
+
+    /**
+     *  A Metric constructor with the same settings cloned from another.
+     */
+    public Metric(Metric o) {
+        this.name = o.getMetricName();
+        this.timestamp = o.getTimestamp();
+        this.topologyId = o.getTopologyId();
+        this.value = o.getValue();
+        this.componentId = o.getComponentId();
+        this.executorId = o.getExecutorId();
+        this.hostname = o.getHostname();
+        this.streamId = o.getStreamId();
+        this.port = o.getPort();
+        this.count = o.getCount();
+        this.min = o.getMin();
+        this.max = o.getMax();
+        this.sum = o.getSum();
+        this.aggLevel = o.getAggLevel();
+    }
+
+    /**
+     *  Check if a Metric matches another object.
+     */
+    public boolean equals(Object other) {
+
+        if (!(other instanceof Metric)) {
+            return false;
+        }
+
+        Metric o = (Metric) other;
+
+        return this == other
+                || (this.name.equals(o.getMetricName())
+                && this.timestamp == o.getTimestamp()
+                && this.topologyId.equals(o.getTopologyId())
+                && this.value == o.getValue()
+                && this.componentId.equals(o.getComponentId())
+                && this.executorId.equals(o.getExecutorId())
+                && this.hostname.equals(o.getHostname())
+                && this.streamId.equals(o.getStreamId())
+                && this.port == o.getPort()
+                && this.count == o.getCount()
+                && this.min == o.getMin()
+                && this.max == o.getMax()
+                && this.sum == o.getSum()
+                && this.aggLevel == o.getAggLevel());
+    }
+
+    public AggLevel getAggLevel() {
+        return this.aggLevel;
+    }
+
+    /**
+     *  Set the aggLevel.
+     */
+    public void setAggLevel(AggLevel aggLevel) throws MetricException {
+        if (aggLevel == null) {
+            throw new MetricException("AggLevel not set for metric");
+        }
+        this.aggLevel = aggLevel;
+    }
+
+    /**
+     *  Initialize the metric value.
+     */
+    public void setValue(double value) {
+        this.count = 1L;
+        this.min = value;
+        this.max = value;
+        this.sum = value;
+        this.value = value;
+    }
+
+    /**
+     *  Adds an additional value to the metric.
+     */
+    public void addValue(double value) {
+        this.count += 1;
+        this.min = Math.min(this.min, value);
+        this.max = Math.max(this.max, value);
+        this.sum += value;
+        this.value = this.sum / this.count;
+    }
+
+    public double getSum() {
+        return this.sum;
+    }
+
+    public void setSum(double sum) {
+        this.sum = sum;
+    }
+
+    public long getCount() {
+        return this.count;
+    }
+
+    public void setCount(long count) {
+        this.count = count;
+    }
+
+    public double getMin() {
+        return this.min;
+    }
+
+    public void setMin(double min) {
+        this.min = min;
+    }
+
+    public double getMax() {
+        return this.max;
+    }
+
+    public void setMax(double max) {
+        this.max = max;
+    }
+
+    public String getTopologyId() {
+        return this.topologyId;
+    }
+
+    public void setTopologyId(String topologyId) {
+        this.topologyId = topologyId;
+    }
+
+    public long getTimestamp() {
+        return this.timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public double getValue() {
+        return this.value;
+    }
+
+    public String getMetricName() {
+        return this.name;
+    }
+
+    public String getComponentId() {
+        return this.componentId;
+    }
+
+    public String getExecutorId() {
+        return this.executorId;
+    }
+
+    public String getHostname() {
+        return this.hostname;
+    }
+
+    public String getStreamId() {
+        return this.streamId;
+    }
+
+    public Integer getPort() {
+        return this.port;
+    }
+
+    @Override
+    public int compareTo(Metric o) {
+        long a = this.getTimestamp();
+        long b = o.getTimestamp();
+        return Long.compare(a, b);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        Date date = new Date(this.timestamp);
+        DateFormat format = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS");
+        format.setTimeZone(TimeZone.getTimeZone("UTC"));
+        sb.append(format.format(date));
+        sb.append("|");
+        sb.append(this.topologyId);
+        sb.append("|");
+        sb.append(aggLevel);
+        sb.append("|");
+        sb.append(this.name);
+        sb.append("|");
+        sb.append(this.componentId);
+        sb.append("|");
+        sb.append(this.executorId);
+        sb.append("|");
+        sb.append(this.hostname);
+        sb.append("|");
+        sb.append(this.port);
+        sb.append("|");
+        sb.append(this.streamId);
+        return String.format("%s -- count: %d -- value: %f -- min: %f -- max: 
%f -- sum: %f",
+                sb.toString(),
+                this.count,
+                this.value,
+                this.min,
+                this.max,
+                this.sum);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java 
b/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java
new file mode 100644
index 0000000..e45a451
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/MetricException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+/**
+ * A MetricException is used to describe an error using a MetricStore.
+ */
+public class MetricException extends Exception {
+    public MetricException(String message) {
+        super(message);
+    }
+
+    public MetricException(String message, Throwable e) {
+        super(message, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java 
b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java
new file mode 100644
index 0000000..166333b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.Map;
+
+public interface MetricStore extends AutoCloseable {
+
+    /**
+     * Create metric store instance using the configurations provided via the 
config map.
+     *
+     * @param config Storm config map
+     * @throws MetricException on preparation error
+     */
+    void prepare(Map config) throws MetricException;
+
+    /**
+     * Stores a metric in the store.
+     *
+     * @param metric  Metric to store
+     * @throws MetricException on error
+     */
+    void insert(Metric metric) throws MetricException;
+
+    /**
+     * Fill out the numeric values for a metric.
+     *
+     * @param metric  Metric to populate
+     * @return   true if the metric was populated, false otherwise
+     * @throws MetricException on error
+     */
+    boolean populateValue(Metric metric) throws MetricException;
+
+    /**
+     * Close the metric store.
+     */
+    void close();
+
+    /**
+     *  Scans all metrics in the store and returns the ones matching the 
specified filtering options.
+     *
+     * @param filter   options to filter by
+     * @param scanCallback  callback for each Metric found
+     * @throws MetricException  on error
+     */
+    void scan(FilterOptions filter, ScanCallback scanCallback) throws 
MetricException;
+
+    /**
+     *  Interface used to callback metrics results from a scan.
+     */
+    interface ScanCallback {
+        void cb(Metric metric);
+    }
+}
+
+
+
+

Reply via email to