SLIDER-323. Allocated ports must be reported back upon AM restart
SLIDER-262. Slider agent should provide process supervision such as auto-restart


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/47822dff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/47822dff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/47822dff

Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry
Commit: 47822dffa3e086c198fed820ec1e4b099a453659
Parents: 18b0545
Author: Sumit Mohanty <smoha...@hortonworks.com>
Authored: Thu Aug 14 17:36:59 2014 -0700
Committer: Sumit Mohanty <smoha...@hortonworks.com>
Committed: Thu Aug 14 17:57:43 2014 -0700

----------------------------------------------------------------------
 .../src/main/python/agent/ActionQueue.py        | 11 ++-
 .../src/main/python/agent/CommandStatusDict.py  |  5 +-
 .../src/main/python/agent/Controller.py         | 27 ++++---
 .../python/agent/CustomServiceOrchestrator.py   |  8 +-
 slider-agent/src/main/python/agent/Heartbeat.py |  9 ++-
 slider-agent/src/main/python/agent/Register.py  |  7 +-
 .../src/test/python/agent/TestActionQueue.py    |  2 -
 .../src/test/python/agent/TestController.py     | 29 +++++--
 .../agent/TestCustomServiceOrchestrator.py      |  2 +
 .../src/test/python/agent/TestHeartbeat.py      | 82 +++++++++++++++++++-
 .../src/test/python/agent/TestRegistration.py   |  8 +-
 .../providers/agent/AgentProviderService.java   | 51 +++++++++---
 .../appmaster/web/rest/agent/HeartBeat.java     | 10 ---
 .../appmaster/web/rest/agent/Register.java      | 58 ++++++++++----
 .../agent/TestAgentProviderService.java         | 17 ++++
 15 files changed, 254 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/ActionQueue.py 
b/slider-agent/src/main/python/agent/ActionQueue.py
index fba9aa2..4c45a76 100644
--- a/slider-agent/src/main/python/agent/ActionQueue.py
+++ b/slider-agent/src/main/python/agent/ActionQueue.py
@@ -122,7 +122,8 @@ class ActionQueue(threading.Thread):
 
     taskId = command['taskId']
 
-    reportResult = not command[Constants.AUTO_GENERATED]
+    # if auto generated then do not report result
+    reportResult = CommandStatusDict.shouldReportResult(command)
 
     # Preparing 'IN_PROGRESS' report
     in_progress_status = self.commandStatuses.generate_report_template(command)
@@ -140,7 +141,8 @@ class ActionQueue(threading.Thread):
     if ActionQueue.STORE_APPLIED_CONFIG in command['commandParams']:
       store_config = 'true' == 
command['commandParams'][ActionQueue.STORE_APPLIED_CONFIG]
     store_command = False
-    if ActionQueue.AUTO_RESTART in command['roleParams']:
+    if 'roleParams' in command and ActionQueue.AUTO_RESTART in 
command['roleParams']:
+      logger.info("Component has indicated auto-restart. Saving details from 
START command.")
       store_command = 'true' == command['roleParams'][ActionQueue.AUTO_RESTART]
 
 
@@ -195,10 +197,7 @@ class ActionQueue(threading.Thread):
       cluster = command['clusterName']
       service = command['serviceName']
       component = command['componentName']
-      reportResult = True
-      if Constants.AUTO_GENERATED in command:
-        reportResult = not command[Constants.AUTO_GENERATED]
-
+      reportResult = CommandStatusDict.shouldReportResult(command)
       component_status = 
self.customServiceOrchestrator.requestComponentStatus(command)
 
       result = {"componentName": component,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/CommandStatusDict.py 
b/slider-agent/src/main/python/agent/CommandStatusDict.py
index f20e0fa..bcbce9e 100644
--- a/slider-agent/src/main/python/agent/CommandStatusDict.py
+++ b/slider-agent/src/main/python/agent/CommandStatusDict.py
@@ -114,7 +114,7 @@ class CommandStatusDict():
     grep = Grep()
     output = grep.tail(tmpout, Grep.OUTPUT_LAST_LINES)
     inprogress = self.generate_report_template(command)
-    reportResult = not command[Constants.AUTO_GENERATED]
+    reportResult = CommandStatusDict.shouldReportResult(command)
     inprogress.update({
       'stdout': grep.filterMarkup(output),
       'stderr': tmperr,
@@ -142,3 +142,6 @@ class CommandStatusDict():
     return stub
 
 
+  @staticmethod
+  def shouldReportResult(command):
+    return not (Constants.AUTO_GENERATED in command and 
command[Constants.AUTO_GENERATED])

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py 
b/slider-agent/src/main/python/agent/Controller.py
index 6bdfc26..1e27efa 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -118,7 +118,11 @@ class Controller(threading.Thread):
 
     while not self.isRegistered:
       try:
-        data = json.dumps(self.register.build(id))
+        data = json.dumps(self.register.build(
+          self.componentActualState,
+          self.componentExpectedState,
+          self.actionQueue.customServiceOrchestrator.allocated_ports,
+          id))
         logger.info("Registering with the server at " + self.registerUrl +
                     " with data " + pprint.pformat(data))
         response = self.sendRequest(self.registerUrl, data)
@@ -211,7 +215,7 @@ class Controller(threading.Thread):
       try:
         if not retry:
           data = json.dumps(
-            self.heartbeat.build(commandResult, self.componentActualState,
+            self.heartbeat.build(commandResult,
                                  self.responseId, self.hasMappedComponents))
           self.updateStateBasedOnResult(commandResult)
           logger.debug("Sending request: " + data)
@@ -225,8 +229,11 @@ class Controller(threading.Thread):
 
         serverId = int(response['responseId'])
 
+        restartEnabled = False
         if 'restartEnabled' in response:
-          restartEnabled = 'true' == response['restartEnabled']
+          restartEnabled = response['restartEnabled']
+          if restartEnabled:
+            logger.info("Component auto-restart is enabled.")
 
         if 'hasMappedComponents' in response.keys():
           self.hasMappedComponents = response['hasMappedComponents'] != False
@@ -262,15 +269,16 @@ class Controller(threading.Thread):
           pass
 
         # Add a start command
-        if self.componentActualState == State.INSTALLED and \
+        if self.componentActualState == State.FAILED and \
                 self.componentExpectedState == State.STARTED and 
restartEnabled:
           stored_command = 
self.actionQueue.customServiceOrchestrator.stored_command
           if len(stored_command) > 0:
-             auto_start_command = self.create_start_command(stored_command)
-             if auto_start_command:
-               logger.info("Automatically adding a start command.")
-               self.updateStateBasedOnCommand([auto_start_command], False)
-               self.addToQueue([auto_start_command])
+            auto_start_command = self.create_start_command(stored_command)
+            if auto_start_command:
+              logger.info("Automatically adding a start command.")
+              logger.debug("Auto start command: " + 
pprint.pformat(auto_start_command))
+              self.updateStateBasedOnCommand([auto_start_command], False)
+              self.addToQueue([auto_start_command])
           pass
 
         # Add a status command
@@ -388,6 +396,7 @@ class Controller(threading.Thread):
 
       if "healthStatus" in commandResult:
         if commandResult["healthStatus"] == "INSTALLED":
+          # Mark it FAILED as its a failure remedied by auto-start or 
container restart
           self.componentActualState = State.FAILED
           self.failureCount += 1
           self.logStates()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py 
b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
index 6b2ace5..15f1664 100644
--- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
+++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
@@ -57,6 +57,7 @@ class CustomServiceOrchestrator():
                                                                   
'status_command_stderr.txt'))
     self.public_fqdn = hostname.public_hostname()
     self.stored_command = {}
+    self.allocated_ports = {}
     # Clean up old status command files if any
     try:
       os.unlink(self.status_commands_stdout)
@@ -69,7 +70,7 @@ class CustomServiceOrchestrator():
 
   def runCommand(self, command, tmpoutfile, tmperrfile,
                  override_output_files=True, store_command=False):
-    allocated_port = {}
+    allocated_ports = {}
     try:
       script_type = command['commandParams']['script_type']
       script = command['commandParams']['script']
@@ -86,7 +87,7 @@ class CustomServiceOrchestrator():
       # We don't support anything else yet
         message = "Unknown script type {0}".format(script_type)
         raise AgentException(message)
-      json_path = self.dump_command_to_json(command, allocated_port, 
store_command)
+      json_path = self.dump_command_to_json(command, allocated_ports, 
store_command)
       py_file_list = [script_tuple]
       # filter None values
       filtered_py_file_list = [i for i in py_file_list if i]
@@ -132,7 +133,8 @@ class CustomServiceOrchestrator():
       }
 
     if Constants.EXIT_CODE in ret and ret[Constants.EXIT_CODE] == 0:
-      ret[Constants.ALLOCATED_PORTS] = allocated_port
+      ret[Constants.ALLOCATED_PORTS] = allocated_ports
+      self.allocated_ports = allocated_ports
 
     # Irrespective of the outcome report the folder paths
     if command_name == 'INSTALL':

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Heartbeat.py 
b/slider-agent/src/main/python/agent/Heartbeat.py
index aa403d4..b107d92 100644
--- a/slider-agent/src/main/python/agent/Heartbeat.py
+++ b/slider-agent/src/main/python/agent/Heartbeat.py
@@ -36,7 +36,7 @@ class Heartbeat:
     self.config = config
     self.reports = []
 
-  def build(self, commandResult, componentActualState, id='-1',
+  def build(self, commandResult, id='-1',
             componentsMapped=False):
     timestamp = int(time.time() * 1000)
     queueResult = self.actionQueue.result()
@@ -49,8 +49,7 @@ class Heartbeat:
                  'timestamp': timestamp,
                  'hostname': self.config.getLabel(),
                  'nodeStatus': nodeStatus,
-                 'fqdn': hostname.public_hostname(),
-                 'agentState': componentActualState
+                 'fqdn': hostname.public_hostname()
     }
 
     commandsInProgress = False
@@ -62,6 +61,10 @@ class Heartbeat:
         if report['reportResult']:
           del report['reportResult']
           heartbeat['reports'].append(report)
+        else:
+          # dropping the result but only recording the status
+          commandResult["commandStatus"] = report["status"]
+          pass
       if len(heartbeat['reports']) > 0:
         # There may be IN_PROGRESS tasks
         commandsInProgress = True

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/main/python/agent/Register.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Register.py 
b/slider-agent/src/main/python/agent/Register.py
index 7c7ff06..b59154f 100644
--- a/slider-agent/src/main/python/agent/Register.py
+++ b/slider-agent/src/main/python/agent/Register.py
@@ -29,7 +29,7 @@ class Register:
   def __init__(self, config):
     self.config = config
 
-  def build(self, id='-1'):
+  def build(self, actualState, expectedState, allocated_ports, id='-1'):
     timestamp = int(time.time() * 1000)
 
     version = self.read_agent_version()
@@ -38,7 +38,10 @@ class Register:
                 'timestamp': timestamp,
                 'hostname': self.config.getLabel(),
                 'publicHostname': hostname.public_hostname(),
-                'agentVersion': version
+                'agentVersion': version,
+                'actualState': actualState,
+                'expectedState': expectedState,
+                'allocatedPorts': allocated_ports
     }
     return register
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestActionQueue.py 
b/slider-agent/src/test/python/agent/TestActionQueue.py
index 4a53c2e..8071ee8 100644
--- a/slider-agent/src/test/python/agent/TestActionQueue.py
+++ b/slider-agent/src/test/python/agent/TestActionQueue.py
@@ -61,8 +61,6 @@ class TestActionQueue(TestCase):
     'serviceName': u'HBASE',
     'configurations': {'global': {}},
     'configurationTags': {'global': {'tag': 'v1'}},
-    'auto_generated': False,
-    'roleParams': {'auto_restart':'false'},
     'commandParams': {'script_type': 'PYTHON',
                       'script': 'scripts/abc.py',
                       'command_timeout': '600'}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestController.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestController.py 
b/slider-agent/src/test/python/agent/TestController.py
index 91e12f1..401d69a 100644
--- a/slider-agent/src/test/python/agent/TestController.py
+++ b/slider-agent/src/test/python/agent/TestController.py
@@ -55,6 +55,7 @@ class TestController(unittest.TestCase):
     self.controller = Controller.Controller(config)
     self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS = 0.1
     self.controller.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC = 0.1
+    self.controller.actionQueue = ActionQueue.ActionQueue(config, 
self.controller)
 
 
   @patch("json.dumps")
@@ -289,7 +290,7 @@ class TestController(unittest.TestCase):
     self.controller.sendRequest = sendRequest
 
     self.controller.responseId = 1
-    response = {"responseId":"2", "restartAgent":"false"}
+    response = {"responseId":"2", "restartAgent": False}
     loadsMock.return_value = response
 
     def one_heartbeat(*args, **kwargs):
@@ -651,8 +652,9 @@ class TestController(unittest.TestCase):
     self.controller.sendRequest = sendRequest
 
     self.controller.responseId = 1
-    response = {"responseId":"2", "restartAgent":"false", 
"restartEnabled":'true'}
-    loadsMock.return_value = response
+    response1 = {"responseId": "2", "restartAgent": False, "restartEnabled": 
True}
+    response2 = {"responseId": "2", "restartAgent": False, "restartEnabled": 
False}
+    loadsMock.side_effect = [response1, response2, response1]
 
     def one_heartbeat(*args, **kwargs):
       self.controller.DEBUG_STOP_HEARTBEATING = True
@@ -665,9 +667,9 @@ class TestController(unittest.TestCase):
 
     # one successful request, after stop
     self.controller.actionQueue = actionQueue
-    self.controller.componentActualState = State.INSTALLED
+    self.controller.componentActualState = State.FAILED
     self.controller.componentExpectedState = State.STARTED
-    self.assertTrue(self.controller.componentActualState, State.INSTALLED)
+    self.assertTrue(self.controller.componentActualState, State.FAILED)
     self.controller.actionQueue.customServiceOrchestrator.stored_command = {
       'commandType': 'EXECUTION_COMMAND',
       'role': u'HBASE',
@@ -699,6 +701,23 @@ class TestController(unittest.TestCase):
       'commandId': '8-1',
       'auto_generated': True}])])
     self.controller.config = original_value
+
+    # restartEnabled = False
+    self.controller.componentActualState = State.FAILED
+    self.controller.heartbeatWithServer()
+
+    self.assertTrue(sendRequest.called)
+    self.assertTrue(self.controller.componentActualState, State.FAILED)
+    self.assertTrue(self.controller.componentExpectedState, State.STARTED)
+
+    # restartEnabled = True
+    self.controller.componentActualState = State.INSTALLED
+    self.controller.componentExpectedState = State.INSTALLED
+    self.controller.heartbeatWithServer()
+
+    self.assertTrue(sendRequest.called)
+    self.assertTrue(self.controller.componentActualState, State.INSTALLED)
+    self.assertTrue(self.controller.componentExpectedState, State.INSTALLED)
     pass
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py 
b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
index 6408809..e545afe 100644
--- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
+++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
@@ -230,6 +230,8 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertEqual(ret['allocated_ports'], {'a.a.port': '10233'})
     self.assertTrue(run_file_mock.called)
     self.assertEqual(run_file_mock.call_count, 1)
+    self.assertEqual(orchestrator.allocated_ports, {'a.a.port': '10233'})
+    self.assertEqual(orchestrator.stored_command, {})
 
 
   @patch.object(socket, "close")

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestHeartbeat.py 
b/slider-agent/src/test/python/agent/TestHeartbeat.py
index 70870ec..b012218 100644
--- a/slider-agent/src/test/python/agent/TestHeartbeat.py
+++ b/slider-agent/src/test/python/agent/TestHeartbeat.py
@@ -50,7 +50,7 @@ class TestHeartbeat(TestCase):
     dummy_controller = MagicMock()
     actionQueue = ActionQueue(config, dummy_controller)
     heartbeat = Heartbeat(actionQueue, config)
-    result = heartbeat.build({}, State.STARTED, 100)
+    result = heartbeat.build({}, 100)
     print "Heartbeat: " + str(result)
     self.assertEquals(result['hostname'] != '', True,
                       "hostname should not be empty")
@@ -63,7 +63,6 @@ class TestHeartbeat(TestCase):
     self.assertEquals(len(result['nodeStatus']), 2)
     self.assertEquals(result['nodeStatus']['cause'], "NONE")
     self.assertEquals(result['nodeStatus']['status'], "HEALTHY")
-    self.assertEquals(result['agentState'], State.STARTED)
     # result may or may NOT have an agentEnv structure in it
     self.assertEquals((len(result) is 6) or (len(result) is 7), True)
     self.assertEquals(not heartbeat.reports, True,
@@ -147,7 +146,7 @@ class TestHeartbeat(TestCase):
     }
     heartbeat = Heartbeat(actionQueue, config)
     # State.STARTED results in agentState to be set to 4 (enum order)
-    hb = heartbeat.build({}, State.STARTED, 10)
+    hb = heartbeat.build({}, 10)
     hb['hostname'] = 'hostname'
     hb['timestamp'] = 'timestamp'
     hb['fqdn'] = 'fqdn'
@@ -155,7 +154,7 @@ class TestHeartbeat(TestCase):
                   {'status': 'HEALTHY',
                    'cause': 'NONE'},
                 'timestamp': 'timestamp', 'hostname': 'hostname', 'fqdn': 
'fqdn',
-                'agentState': 4, 'responseId': 10, 'reports': [
+                'responseId': 10, 'reports': [
       {'status': 'IN_PROGRESS', 'roleCommand': u'INSTALL',
        'serviceName': u'HDFS', 'role': u'DATANODE', 'actionId': '1-1',
        'stderr': 'Read from /tmp/errors-3.txt',
@@ -178,6 +177,81 @@ class TestHeartbeat(TestCase):
     self.assertEqual.__self__.maxDiff = None
     self.assertEquals(hb, expected)
 
+  @patch.object(ActionQueue, "result")
+  def test_build_result2(self, result_mock):
+    config = AgentConfig("", "")
+    config.set('agent', 'prefix', 'tmp')
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    result_mock.return_value = {
+      'reports': [{'status': 'IN_PROGRESS',
+                   'stderr': 'Read from /tmp/errors-3.txt',
+                   'stdout': 'Read from /tmp/output-3.txt',
+                   'clusterName': u'cc',
+                   'roleCommand': u'INSTALL',
+                   'serviceName': u'HDFS',
+                   'role': u'DATANODE',
+                   'actionId': '1-1',
+                   'taskId': 3,
+                   'exitcode': 777,
+                   'reportResult' : False}
+      ],
+      'componentStatus': []
+      }
+    heartbeat = Heartbeat(actionQueue, config)
+
+    commandResult = {}
+    hb = heartbeat.build(commandResult, 10)
+    hb['hostname'] = 'hostname'
+    hb['timestamp'] = 'timestamp'
+    hb['fqdn'] = 'fqdn'
+    expected = {'nodeStatus':
+                  {'status': 'HEALTHY',
+                   'cause': 'NONE'},
+                'timestamp': 'timestamp', 'hostname': 'hostname', 'fqdn': 
'fqdn',
+                'responseId': 10, 'reports': []}
+    self.assertEqual.__self__.maxDiff = None
+    self.assertEquals(hb, expected)
+    self.assertEquals(commandResult, {'commandStatus': 'IN_PROGRESS'})
+
+  @patch.object(ActionQueue, "result")
+  def test_build_result3(self, result_mock):
+    config = AgentConfig("", "")
+    config.set('agent', 'prefix', 'tmp')
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    result_mock.return_value = {
+      'reports': [{'status': 'COMPLETED',
+                   'stderr': 'Read from /tmp/errors-3.txt',
+                   'stdout': 'Read from /tmp/output-3.txt',
+                   'clusterName': u'cc',
+                   'roleCommand': u'INSTALL',
+                   'serviceName': u'HDFS',
+                   'role': u'DATANODE',
+                   'actionId': '1-1',
+                   'taskId': 3,
+                   'exitcode': 777,
+                   'reportResult' : False}
+      ],
+      'componentStatus': []
+    }
+    heartbeat = Heartbeat(actionQueue, config)
+
+    commandResult = {}
+    hb = heartbeat.build(commandResult, 10)
+    hb['hostname'] = 'hostname'
+    hb['timestamp'] = 'timestamp'
+    hb['fqdn'] = 'fqdn'
+    expected = {'nodeStatus':
+                  {'status': 'HEALTHY',
+                   'cause': 'NONE'},
+                'timestamp': 'timestamp', 'hostname': 'hostname', 'fqdn': 
'fqdn',
+                'responseId': 10, 'reports': []}
+    self.assertEqual.__self__.maxDiff = None
+    self.assertEquals(hb, expected)
+    self.assertEquals(commandResult, {'commandStatus': 'COMPLETED'})
+
+
 
 if __name__ == "__main__":
   logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-agent/src/test/python/agent/TestRegistration.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestRegistration.py 
b/slider-agent/src/test/python/agent/TestRegistration.py
index 5245af9..f91fe29 100644
--- a/slider-agent/src/test/python/agent/TestRegistration.py
+++ b/slider-agent/src/test/python/agent/TestRegistration.py
@@ -26,6 +26,7 @@ import tempfile
 from mock.mock import patch
 from mock.mock import MagicMock
 from Register import Register
+from Controller import State
 from AgentConfig import AgentConfig
 
 class TestRegistration(TestCase):
@@ -47,14 +48,17 @@ class TestRegistration(TestCase):
       text_file.write("1.3.0")
 
     register = Register(config)
-    data = register.build(1)
+    data = register.build(State.INIT, State.INIT, {}, 1)
     #print ("Register: " + pprint.pformat(data))
     self.assertEquals(data['hostname'] != "", True, "hostname should not be 
empty")
     self.assertEquals(data['publicHostname'] != "", True, "publicHostname 
should not be empty")
     self.assertEquals(data['responseId'], 1)
     self.assertEquals(data['timestamp'] > 1353678475465L, True, "timestamp 
should not be empty")
     self.assertEquals(data['agentVersion'], '1.3.0', "agentVersion should not 
be empty")
-    self.assertEquals(len(data), 5)
+    self.assertEquals(data['actualState'], State.INIT, "actualState should not 
be empty")
+    self.assertEquals(data['expectedState'], State.INIT, "expectedState should 
not be empty")
+    self.assertEquals(data['allocatedPorts'], {}, "allocatedPorts should be 
empty")
+    self.assertEquals(len(data), 8)
 
     self.assertEquals(os.path.join(tmpdir, "app/definition"), 
config.getResolvedPath("app_pkg_dir"))
     self.assertEquals(os.path.join(tmpdir, "app/install"), 
config.getResolvedPath("app_install_dir"))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index b22ef39..8ce1f18 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -398,9 +398,19 @@ public class AgentProviderService extends 
AbstractProviderService implements
     log.info("Handling registration: " + registration);
     RegistrationResponse response = new RegistrationResponse();
     String label = registration.getHostname();
+    State agentState = registration.getActualState();
     if (getComponentStatuses().containsKey(label)) {
       response.setResponseStatus(RegistrationStatus.OK);
-      getComponentStatuses().get(label).heartbeat(System.currentTimeMillis());
+      ComponentInstanceState componentStatus = 
getComponentStatuses().get(label);
+      componentStatus.heartbeat(System.currentTimeMillis());
+      updateComponentStatusWithAgentState(componentStatus, agentState);
+
+      Map<String, String> ports = registration.getAllocatedPorts();
+      if (ports != null && !ports.isEmpty()) {
+        String roleName = getRoleName(label);
+        String containerId = getContainerId(label);
+        processAllocatedPorts(registration.getPublicHostname(), roleName, 
containerId, ports);
+      }
     } else {
       response.setResponseStatus(RegistrationStatus.FAILED);
       response.setLog("Label not recognized.");
@@ -424,9 +434,8 @@ public class AgentProviderService extends 
AbstractProviderService implements
 
     String label = heartBeat.getHostname();
     String roleName = getRoleName(label);
-    State agentState = heartBeat.getAgentState();
-
     String containerId = getContainerId(label);
+
     StateAccessForProviders accessor = getAmState();
     String scriptPath = getScriptPathFromMetainfo(roleName);
 
@@ -441,13 +450,22 @@ public class AgentProviderService extends 
AbstractProviderService implements
 
     Boolean isMaster = isMaster(roleName);
     ComponentInstanceState componentStatus = getComponentStatuses().get(label);
-    updateComponentStatusWithAgentState(componentStatus, agentState);
     componentStatus.heartbeat(System.currentTimeMillis());
     // If no Master can explicitly publish then publish if its a master
     // Otherwise, wait till the master that can publish is ready
     if (isMaster &&
         (canAnyMasterPublishConfig() == false || canPublishConfig(roleName))) {
       publishConfigAndExportGroups(heartBeat, componentStatus);
+    } else {
+      // Ack that config has been reported
+      List<ComponentStatus> statuses = heartBeat.getComponentStatus();
+      if (statuses != null && !statuses.isEmpty()) {
+        ComponentStatus status = statuses.get(0);
+        if(status.getConfigs().size() > 0) {
+          log.info("Config got reported by {} but discarded as component {} 
cannot publish.", label, roleName);
+          componentStatus.setConfigReported(true);
+        }
+      }
     }
 
     List<CommandReport> reports = heartBeat.getReports();
@@ -455,14 +473,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
       CommandReport report = reports.get(0);
       Map<String, String> ports = report.getAllocatedPorts();
       if (ports != null && !ports.isEmpty()) {
-        for (Map.Entry<String, String> port : ports.entrySet()) {
-          log.info("Recording allocated port for {} as {}", port.getKey(), 
port.getValue());
-          this.getAllocatedPorts().put(port.getKey(), port.getValue());
-          this.getAllocatedPorts(containerId).put(port.getKey(), 
port.getValue());
-        }
-
-        // component specific publishes
-        processAndPublishComponentSpecificData(ports, containerId, 
heartBeat.getFqdn(), roleName);
+        processAllocatedPorts(heartBeat.getFqdn(), roleName, containerId, 
ports);
       }
       CommandResult result = 
CommandResult.getCommandResult(report.getStatus());
       Command command = Command.getCommand(report.getRoleCommand());
@@ -506,6 +517,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
       if (isMaster && componentStatus.getState() == State.STARTED
           && command == Command.NOP) {
         if (!componentStatus.getConfigReported()) {
+          log.info("Requesting applied config for {} on {}.", roleName, 
containerId);
           addGetConfigCommand(roleName, containerId, response);
         }
       }
@@ -525,6 +537,20 @@ public class AgentProviderService extends 
AbstractProviderService implements
     return response;
   }
 
+  protected void processAllocatedPorts(String fqdn,
+                                     String roleName,
+                                     String containerId,
+                                     Map<String, String> ports) {
+    for (Map.Entry<String, String> port : ports.entrySet()) {
+      log.info("Recording allocated port for {} as {}", port.getKey(), 
port.getValue());
+      this.getAllocatedPorts().put(port.getKey(), port.getValue());
+      this.getAllocatedPorts(containerId).put(port.getKey(), port.getValue());
+    }
+
+    // component specific publishes
+    processAndPublishComponentSpecificData(ports, containerId, fqdn, roleName);
+  }
+
   private void updateComponentStatusWithAgentState(
       ComponentInstanceState componentStatus, State agentState) {
     if (agentState != null) {
@@ -803,6 +829,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
               }
             }
           }
+          log.info("Received and stored config for {}", 
heartBeat.getHostname());
           componentStatus.setConfigReported(true);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
index 62df18d..a08d46d 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
@@ -45,7 +45,6 @@ public class HeartBeat {
   HostStatus nodeStatus;
   private AgentEnv agentEnv = null;
   private String fqdn;
-  private State agentState;
 
   public long getResponseId() {
     return responseId;
@@ -125,14 +124,6 @@ public class HeartBeat {
     this.mounts = mounts;
   }
 
-  public State getAgentState() {
-    return agentState;
-  }
-
-  public void setAgentState(State agentState) {
-    this.agentState = agentState;
-  }
-
   @Override
   public String toString() {
     return "HeartBeat{" +
@@ -142,7 +133,6 @@ public class HeartBeat {
            ", reports=" + reports +
            ", componentStatus=" + componentStatus +
            ", nodeStatus=" + nodeStatus +
-           ", agentState=" + agentState +
            '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java
index 9299a16..a44c3a4 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/Register.java
@@ -16,15 +16,14 @@
  */
 package org.apache.slider.server.appmaster.web.rest.agent;
 
+import org.apache.slider.providers.agent.State;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
-/**
- *
- * Data model for agent to send heartbeat to ambari and/or app master.
- *
- */
+import java.util.Map;
+
+/** Data model for agent to send heartbeat to ambari and/or app master. */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
 public class Register {
@@ -36,6 +35,9 @@ public class Register {
   private String publicHostname;
   private AgentEnv agentEnv;
   private String agentVersion;
+  private State actualState;
+  private State expectedState;
+  private Map<String, String> allocatedPorts;
 
   @JsonProperty("responseId")
   public int getResponseId() {
@@ -44,13 +46,17 @@ public class Register {
 
   @JsonProperty("responseId")
   public void setResponseId(int responseId) {
-    this.responseId=responseId;
+    this.responseId = responseId;
   }
 
   public long getTimestamp() {
     return timestamp;
   }
 
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
   public String getHostname() {
     return hostname;
   }
@@ -67,10 +73,6 @@ public class Register {
     this.hardwareProfile = hardwareProfile;
   }
 
-  public void setTimestamp(long timestamp) {
-    this.timestamp = timestamp;
-  }
-
   public String getPublicHostname() {
     return publicHostname;
   }
@@ -103,15 +105,45 @@ public class Register {
     this.currentPingPort = currentPingPort;
   }
 
+  public State getActualState() {
+    return actualState;
+  }
+
+  public void setActualState(State actualState) {
+    this.actualState = actualState;
+  }
+
+  public State getExpectedState() {
+    return expectedState;
+  }
+
+  public void setExpectedState(State expectedState) {
+    this.expectedState = expectedState;
+  }
+
+  /** @return the allocated ports, or <code>null</code> if none are present */
+  @JsonProperty("allocatedPorts")
+  public Map<String, String> getAllocatedPorts() {
+    return allocatedPorts;
+  }
+
+  /** @param ports allocated ports */
+  @JsonProperty("allocatedPorts")
+  public void setAllocatedPorts(Map<String, String> ports) {
+    this.allocatedPorts = ports;
+  }
+
   @Override
   public String toString() {
     String ret = "responseId=" + responseId + "\n" +
                  "timestamp=" + timestamp + "\n" +
-                 "hostname="  + hostname + "\n" +
-                 "currentPingPort=" + currentPingPort + "\n";
+                 "hostname=" + hostname + "\n" +
+                 "expectedState=" + expectedState + "\n" +
+                 "actualState=" + actualState + "\n";
 
-    if (hardwareProfile != null)
+    if (hardwareProfile != null) {
       ret = ret + "hardwareprofile=" + this.hardwareProfile.toString();
+    }
     return ret;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/47822dff/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
 
b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
index 1aeedbc..20c16ff 100644
--- 
a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
+++ 
b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java
@@ -87,6 +87,7 @@ import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyMap;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -271,6 +272,12 @@ public class TestAgentProviderService {
     } catch (SliderException e) {
     }
 
+    doNothing().when(mockAps).processAllocatedPorts(
+        anyString(),
+        anyString(),
+        anyString(),
+        anyMap()
+    );
     expect(access.isApplicationLive()).andReturn(true).anyTimes();
     ClusterDescription desc = new ClusterDescription();
     desc.setOption(OptionKeys.ZOOKEEPER_QUORUM, "host1:2181");
@@ -305,10 +312,20 @@ public class TestAgentProviderService {
     Register reg = new Register();
     reg.setResponseId(0);
     reg.setHostname("mockcontainer_1___HBASE_MASTER");
+    Map<String,String> ports = new HashMap();
+    ports.put("a","100");
+    reg.setAllocatedPorts(ports);
     RegistrationResponse resp = mockAps.handleRegistration(reg);
     Assert.assertEquals(0, resp.getResponseId());
     Assert.assertEquals(RegistrationStatus.OK, resp.getResponseStatus());
 
+    Mockito.verify(mockAps, Mockito.times(1)).processAllocatedPorts(
+        anyString(),
+        anyString(),
+        anyString(),
+        anyMap()
+    );
+
     HeartBeat hb = new HeartBeat();
     hb.setResponseId(1);
     hb.setHostname("mockcontainer_1___HBASE_MASTER");

Reply via email to