Author: challngr
Date: Thu May  1 20:43:26 2014
New Revision: 1591769

URL: http://svn.apache.org/r1591769
Log:
UIMA-3615 Initial vary-on vary-off support.

Added:
    uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off   (with props)
    uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on   (with props)
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java
Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java

Added: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off?rev=1591769&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off (added)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off Thu May  1 20:43:26 
2014
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+# -----------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# -----------------------------------------------------------------------
+
+
+import os
+import sys
+
+from ducc_util  import DuccUtil
+
+class DuccVaryOff(DuccUtil):
+
+    
+    def usage(self, msg):
+        if ( msg != None ):
+            print msg
+
+        print 'vary_off list-of-nodes'
+        print '    This marks the nodes named in the list offline for 
scheduling purposes.'
+        print '    All jobs, services, and managed reservations are canceled 
on the nodes.'
+        print ''
+        print 'Example:'
+        print '   vary_off node1 node2 node3'
+
+        sys.exit(1)
+    
+    def main(self, nodes):
+
+        if ( len(nodes) == 0 ):
+            self.usage(None)
+
+        print 'Varying off', nodes
+        DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + 
"/resources/ducc.properties "
+        DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME
+        DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + 
self.ducc_properties.get('ducc.head')
+        self.spawn(self.java(), DUCC_JVM_OPTS, 
'org.apache.uima.ducc.common.main.DuccRmAdmin', '--varyoff', ' '.join(nodes)) 
+        
+        return
+
+if __name__ == "__main__":
+    stopper = DuccVaryOff()
+    stopper.main(sys.argv[1:])
+
+    

Propchange: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off
------------------------------------------------------------------------------
    svn:executable = *

Added: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on?rev=1591769&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on (added)
+++ uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on Thu May  1 20:43:26 2014
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+# -----------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# -----------------------------------------------------------------------
+
+
+import os
+import sys
+
+from ducc_util  import DuccUtil
+
+class DuccVaryOff(DuccUtil):
+
+    def usage(self, msg):
+        if ( msg != None ):
+            print msg
+
+        print 'vary_on list-of-nodes'
+        print '    This marks the nodes named in the list online for 
scheduling purposes.'
+        print '    The nodes must have been previously online and/or 
configured in a nodellist'
+        print '    at boot time.'
+        print ''
+        print 'Example:'
+        print '   vary_on node1 node2 node3'    
+    
+    def main(self, nodes):
+
+        if ( len(nodes) == 0 ):
+            self.usage(None)
+
+        print 'Varying on', nodes
+        DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + 
"/resources/ducc.properties "
+        DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME
+        DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + 
self.ducc_properties.get('ducc.head')
+
+        self.spawn(self.java(), DUCC_JVM_OPTS, 
'org.apache.uima.ducc.common.main.DuccRmAdmin', '--varyon', ' '.join(nodes)) 
+        
+        return
+
+if __name__ == "__main__":
+    stopper = DuccVaryOff()
+    stopper.main(sys.argv[1:])
+
+    

Propchange: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on
------------------------------------------------------------------------------
    svn:executable = *

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java?rev=1591769&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java
 Thu May  1 20:43:26 2014
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.admin.event;
+
+public class RmAdminReply
+    extends DuccAdminEvent 
+{
+       private static final long serialVersionUID = -8101741014979144426L;
+    String response;
+    
+    public RmAdminReply(String response)
+    {
+        this.response = response;
+    }
+
+    public String getResponse()
+    {
+       return response;
+    }
+}

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java?rev=1591769&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java
 Thu May  1 20:43:26 2014
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.admin.event;
+
+public class RmAdminVaryOff
+    extends DuccAdminEvent 
+{
+       private static final long serialVersionUID = -8101741014979144426L;
+    String[] nodes;
+    
+    public RmAdminVaryOff(String[] nodes)
+    {
+        this.nodes = nodes;
+    }
+
+    public String[] getNodes() { return nodes;}
+}

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java?rev=1591769&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java
 Thu May  1 20:43:26 2014
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.admin.event;
+
+public class RmAdminVaryOn
+    extends DuccAdminEvent 
+{
+       private static final long serialVersionUID = -8101741014979144426L;
+    String[] nodes;
+    
+    public RmAdminVaryOn(String[] nodes)
+    {
+        this.nodes = nodes;
+    }
+
+    public String[] getNodes() { return nodes;}
+}

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java?rev=1591769&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java
 Thu May  1 20:43:26 2014
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.common.main;
+
+import java.io.FileNotFoundException;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.dataformat.xstream.XStreamDataFormat;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultClassResolver;
+import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
+import org.apache.uima.ducc.common.admin.event.RmAdminReply;
+import org.apache.uima.ducc.common.admin.event.RmAdminVaryOff;
+import org.apache.uima.ducc.common.admin.event.RmAdminVaryOn;
+import org.apache.uima.ducc.common.authentication.BrokerCredentials;
+import 
org.apache.uima.ducc.common.authentication.BrokerCredentials.Credentials;
+import org.apache.uima.ducc.common.component.AbstractDuccComponent;
+import org.apache.uima.ducc.common.exception.DuccRuntimeException;
+import org.apache.uima.ducc.common.utils.Utils;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
+
+public class DuccRmAdmin 
+    extends AbstractDuccComponent 
+{
+       public static final String FileSeparator = System
+        .getProperty("file.separator");
+
+       private String brokerUrl;
+       private ProducerTemplate pt;
+       private String targetEndpoint;
+
+       public DuccRmAdmin(CamelContext context, String epname)
+    {
+               super("DuccServiceReaper", context);
+               try {
+                        
+                       // Load ducc properties file and enrich System 
properties. It supports
+                       // overrides for entries in ducc properties file. Any 
key in the ducc
+                       // property file can be overriden with -D<key>=<value>
+                       loadProperties(DuccService.DUCC_PROPERTY_FILE);
+
+                       // fetch the broker URL from ducc.properties
+                       this.brokerUrl = System.getProperty("ducc.broker.url");
+                       try {
+                               String brokerCredentialsFile = 
System.getProperty("ducc.broker.credentials.file");
+                               // fetch the admin endpoint from the 
ducc.properties where
+                               // the admin events will be sent by the 
DuccServiceReaper
+                               targetEndpoint = System.getProperty(epname);
+                if ( targetEndpoint == null ) {
+                    throw new IllegalArgumentException("Cannot find endpoint 
for RM admin.  Is 'ducc.rm.admin.endpoint' configured n ducc.properties?");
+                }
+
+                               // System.out.println("+++ Activating JMS 
Component for Endpoint:" + targetEndpoint + " Broker:" + brokerUrl);
+                               
+                               ActiveMQComponent duccAMQComponent = new 
ActiveMQComponent(context);
+                           duccAMQComponent.setBrokerURL(brokerUrl);
+                               
+                //                             context.addComponent("activemq",
+                //                                             
ActiveMQComponent.activeMQComponent(brokerUrl));
+                           
+                           if ( brokerCredentialsFile != null && 
brokerCredentialsFile.length() > 0 ) {
+                               String path ="";
+                               try {
+                        Utils.findDuccHome();  // add DUCC_HOME to 
System.properties
+                        path = 
Utils.resolvePlaceholderIfExists(brokerCredentialsFile, System.getProperties());
+                        Credentials credentials = BrokerCredentials.get(path);
+                        if ( credentials.getUsername() != null && 
credentials.getPassword() != null ) {
+                                                       
duccAMQComponent.setUserName(credentials.getUsername());
+                                                       
duccAMQComponent.setPassword(credentials.getPassword());
+                        }   
+                               } catch(FileNotFoundException e) {
+                                               System.out.println("DuccRmAdmin 
Failed - File Not Found:"+path+" Broker Credentials 
File:"+brokerCredentialsFile);
+                                               System.exit(-1);
+                               }
+                           }
+                               
context.addComponent("activemq",duccAMQComponent);
+                               this.pt = context.createProducerTemplate();
+                       } catch( Throwable exx) {
+                               System.out.println("DuccRmAdmin Failed:"+exx);
+                               System.exit(-1);
+                       }
+
+               } catch (Exception e) {
+            e.printStackTrace();
+                       System.exit(-1);
+               }
+       }
+
+    private String marshallEvent(DuccAdminEvent duccEvent) 
+        throws Exception 
+    {
+        XStreamDataFormat xStreamDataFormat = new XStreamDataFormat();
+        XStream xStream = xStreamDataFormat.getXStream(new 
DefaultClassResolver());
+        return xStream.toXML(duccEvent);
+    }
+    
+    private RmAdminReply unmarshallEvent(Object targetToUnmarshall) 
+        throws Exception 
+    {
+        XStream xStream = new XStream(new DomDriver());
+        String claz = targetToUnmarshall.getClass().getName();
+
+        if (targetToUnmarshall instanceof byte[]) {
+            Object reply = xStream.fromXML(new String((byte[]) 
targetToUnmarshall));
+            if (reply instanceof RmAdminReply) {
+                return (RmAdminReply) reply;
+            } else {
+                claz = (reply == null) ? "NULL" : reply.getClass().getName();
+            }
+        }
+        throw new Exception( "Unexpected Reply type received from Ducc 
Component. Expected DuccEvent, instead received: " + claz);        
+    }
+
+    public RmAdminReply dispatchAndWaitForReply(DuccAdminEvent duccEvent) 
+        throws Exception 
+    {
+        int maxRetryCount = 20;
+        int i = 0;
+        Object reply = null;
+        RuntimeExchangeException ree = null;
+
+        // retry up to 20 times. This is an attempt to handle an error thrown
+        // by Camel: Failed to resolve replyTo destination on the exchange
+        // Camel waits at most 10000ms( 10secs) for AMQ to create a temp queue.
+        // After 10secs Camel times out and throws an Exception.
+        for (; i < maxRetryCount; i++) {
+            try {
+                reply = pt.sendBody(targetEndpoint, ExchangePattern.InOut, 
marshallEvent(duccEvent));
+                ree = null; // all is well - got a reply
+                break; // done here
+
+            } catch (RuntimeExchangeException e) {
+                String msg = e.getMessage();
+                // Only retry if AMQ failed to create a temp queue
+                if (msg != null && msg.startsWith("Failed to resolve replyTo 
destination on the exchange")) {
+                    ree = e;
+                } else {
+                    throw new DuccRuntimeException("Ducc JMS Dispatcher is 
unable to deliver a request.", e);
+                }
+            }
+        }
+        // when retries hit the threshold, just throw an exception
+        if (i == maxRetryCount) {
+            throw new DuccRuntimeException("ActiveMQ failed to create temp 
reply queue. After 20 attempts to deliver request to the OR, Ducc JMS 
Dispatcher is giving up.",
+                                           ree);
+        }
+        if ( reply instanceof RmAdminReply ) {
+            return (RmAdminReply) reply;
+        } else {
+            throw new DuccRuntimeException("Received unexpected object as 
response: " + reply.getClass().getName());
+        }
+    }
+
+       /**
+        * Interprets and executes Admin command
+        * 
+        * @throws Exception
+        */
+       public void varyoff(String[] args) 
+               throws Exception 
+    {
+        String[] nodes = new String[args.length - 1];
+        for ( int i = 1; i < args.length; i++) nodes[i-1] = args[i];  // take 
a slice of the array
+
+        RmAdminVaryOff vo = new RmAdminVaryOff(nodes);
+               RmAdminReply reply = dispatchAndWaitForReply(vo);
+               System.out.println(reply.getResponse());
+       }
+
+       /**
+        * Interprets and executes Admin command
+        * 
+        * @throws Exception
+        */
+       public void varyon(String[] args) 
+               throws Exception 
+    {
+        String[] nodes = new String[args.length - 1];
+        for ( int i = 1; i < args.length; i++) nodes[i-1] = args[i];  // take 
a slice of the array
+
+        RmAdminVaryOn vo = new RmAdminVaryOn(nodes);
+               RmAdminReply reply = dispatchAndWaitForReply(vo);
+               System.out.println(reply.getResponse());
+       }
+
+    
+    public void run(String[] args)
+       throws Exception
+    {
+        if ( args[0].equals("--varyoff")) {
+            if ( args.length < 2 ) usage("Missing node list");
+            varyoff(args);
+            return;
+        }
+
+        if ( args[0].equals("--varyon")) {
+            if ( args.length < 2 ) usage("Missing node list");
+            varyon(args);
+            return;
+        }
+    }
+
+    public static void usage(String msg)
+    {
+        if ( msg != null ) System.out.println(msg);
+
+        System.out.println("Usage:\n");
+        System.out.println("DuccRmAdmin verb options");
+        System.out.println("Where verbs are:");
+        System.out.println("   --varyoff string-delimeted-nodes");
+        System.out.println("   --varyon  string-delimeted-nodes");
+
+        System.exit(1);
+    }
+
+       public static void main(String[] args) 
+    {
+               try {
+                       DuccRmAdmin admin = new DuccRmAdmin(new 
DefaultCamelContext(), "ducc.rm.admin.endpoint");
+            admin.run(args);
+               } catch (Throwable e) {
+                       e.printStackTrace();
+               } finally {
+                       System.exit(-1);
+               }
+       }
+   
+}

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java?rev=1591769&r1=1591768&r2=1591769&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java
 Thu May  1 20:43:26 2014
@@ -65,9 +65,12 @@ public class NodeStability
             logger.warn(methodName, null, "Ignoring node update, scheduler is 
still booting.");
             return;
         } else {
-            // logger.info(methodName, null, n.getNodeIdentity().getName());
-            scheduler.nodeArrives(n);          // tell RM
-            super.nodeArrives(n);              // tell heartbeat monitor
+            try {
+                scheduler.nodeArrives(n);          // tell RM
+                super.nodeArrives(n);              // tell heartbeat monitor
+            } catch ( Throwable t ) {
+                logger.error(methodName, null, t);
+            }
         }
     }
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java?rev=1591769&r1=1591768&r2=1591769&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
 Thu May  1 20:43:26 2014
@@ -22,6 +22,13 @@ import java.util.Timer;
 import java.util.TimerTask;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.uima.ducc.common.admin.event.DuccAdminEvent;
+import org.apache.uima.ducc.common.admin.event.RmAdminReply;
+import org.apache.uima.ducc.common.admin.event.RmAdminVaryOff;
+import org.apache.uima.ducc.common.admin.event.RmAdminVaryOn;
 import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties;
 import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName;
 import org.apache.uima.ducc.common.component.AbstractDuccComponent;
@@ -83,9 +90,69 @@ public class ResourceManagerComponent 
     public DuccLogger getLogger() {
         return logger; 
     }
+
+
+    /**
+     * Creates Camel Router for Ducc RM admin events.
+     * 
+     * @param endpoint
+     *          - ducc admin endpoint
+     * @param delegate
+     *          - who to call when admin event arrives
+     * @throws Exception
+     */
+    private void startRmAdminChannel(final String endpoint, final 
AbstractDuccComponent delegate)
+        throws Exception 
+    {
+        getContext().addRoutes(new RouteBuilder() {
+                public void configure() {
+                    System.out.println("Configuring RM Admin Channel on 
Endpoint:" + endpoint);
+                    onException(Exception.class).handled(true).process(new 
ErrorProcessor());
+                    
+                    
from(endpoint).routeId("RMAdminRoute").unmarshal().xstream()
+                         .process(new RmAdminEventProcessor(delegate));
+                }
+            });
+        
+        if (logger != null) {
+            logger.info("startRMAdminChannel", null, "Admin Channel Activated 
on endpoint:" + endpoint);
+        }
+    }
+
+    class RmAdminEventProcessor implements Processor 
+    {
+        final AbstractDuccComponent delegate;
+
+        public RmAdminEventProcessor(final AbstractDuccComponent delegate) 
+        {
+            this.delegate = delegate;
+        }
+
+        public void process(final Exchange exchange) 
+            throws Exception 
+        {            
+            String methodName = "RmAdminEventProcessor.process";
+            Object body = exchange.getIn().getBody();
+            logger.info(methodName, null, "Received Admin Message of Type:",  
body.getClass().getName());
+
+            DuccAdminEvent reply = null;
+            if (body instanceof RmAdminVaryOff) {
+               RmAdminVaryOff vo = (RmAdminVaryOff) body;
+                reply = new RmAdminReply(scheduler.varyoff(vo.getNodes()));
+            } else
+            if (body instanceof RmAdminVaryOn) {
+               RmAdminVaryOn vo = (RmAdminVaryOn) body;                 
+                reply = new RmAdminReply(scheduler.varyon(vo.getNodes()));
+            }
+
+            exchange.getIn().setBody(reply);
+        }
+    }
+
     public void start(DuccService service, String[] args)
         throws Exception
     {
+       String methodName = "start";
         converter = new JobManagerConverter(scheduler, stabilityManager);
 
         super.start(service, args);
@@ -96,7 +163,13 @@ public class ResourceManagerComponent 
         nodeMetricsUpdateRate = 
SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", 
DEFAULT_NODE_METRICS_RATE);
         schedulingRatio       = 
SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", 
DEFAULT_SCHEDULING_RATIO);
         schedulingEpoch       = 
SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.rate", 
DEFAULT_SCHEDULING_RATE);
-
+        
+        String adminEndpoint         = 
System.getProperty("ducc.rm.admin.endpoint");
+        if ( adminEndpoint == null ) {
+            logger.warn(methodName, null, "No admin endpoint configured.  Not 
starting admin channel.");
+        } else {
+            startRmAdminChannel(adminEndpoint, this);
+        }
         
         scheduler.init();
         
@@ -235,5 +308,4 @@ public class ResourceManagerComponent 
         }
     }
 
-
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1591769&r1=1591768&r2=1591769&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
 Thu May  1 20:43:26 2014
@@ -70,4 +70,7 @@ public interface ISchedulerMain
 
     // once both initialized() and ready() occur, the RM scaffolding will 
enable scheduling by calling start
     void start();
+
+    String varyoff(String[] nodes);
+    String varyon(String[] nodes);
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1591769&r1=1591768&r2=1591769&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java
 Thu May  1 20:43:26 2014
@@ -51,6 +51,7 @@ class NodePool
 
     HashMap<Node, Machine> allMachines                       = new 
HashMap<Node, Machine>();                   // all active machines in the system
     HashMap<Node, Machine> unresponsiveMachines              = new 
HashMap<Node, Machine>();                   // machines with excessive missed 
heartbeats
+    HashMap<Node, Machine> offlineMachines                   = new 
HashMap<Node, Machine>();
     HashMap<Integer, HashMap<Node, Machine>> machinesByOrder = new 
HashMap<Integer, HashMap<Node, Machine>>(); // Schedulable, not necessarily fee
     HashMap<String, Machine>                 machinesByName  = new 
HashMap<String, Machine>();                 // by name, for nodepool support
     HashMap<String, Machine>                 machinesByIp    = new 
HashMap<String, Machine>();                 // by IP, for nodepool support
@@ -696,6 +697,12 @@ class NodePool
             return m;
         }
 
+        if ( offlineMachines.containsKey(node) ) {               // if it's 
offline it can't be restored like this.
+            Machine m = offlineMachines.get(node);
+            logger.trace(methodName, null, "Node ", m.getId(), " is offline, 
not activating.");
+            return m;
+        }
+
         if ( unresponsiveMachines.containsKey(node) ) {          // reactive 
the node
             Machine m = unresponsiveMachines.remove(node);
 
@@ -740,7 +747,7 @@ class NodePool
         return machine;
     }
 
-    void nodeLeaves(Machine m)
+    void disable(Machine m, HashMap<Node, Machine> disableMap)
     {
         String methodName = "nodeLeaves";
 
@@ -758,18 +765,18 @@ class NodePool
 
                 if ( j.getDuccType() == DuccType.Reservation ) {
                     // UIMA-3614.  Only actual reservation is left intact
-                    logger.info(methodName, j.getId(), "Not purging job on 
dead node, job type:", j.getDuccType());
+                    logger.info(methodName, j.getId(), "Not purging job on 
dead/offline node, job type:", j.getDuccType());
                     break;
                 }
 
-                logger.info(methodName, j.getId(), "Purging job on dead node, 
job type:", j.getDuccType());
+                logger.info(methodName, j.getId(), "Purging job on 
dead/offline node, job type:", j.getDuccType());
                 j.shrinkByOne(s);
                 nPendingByOrder[order]++;
                 s.purge();          // This bet tells OR not to wait for 
confirmation from the agent
             }
 
             allMachines.remove(m.key());
-            unresponsiveMachines.put(m.key(), m);
+            disableMap.put(m.key(), m);
 
             HashMap<Node, Machine> machs = machinesByOrder.get(order);
             machs.remove(m.key());
@@ -785,6 +792,72 @@ class NodePool
         }
     }
 
+    void nodeLeaves(Machine m)
+    {
+        disable(m, unresponsiveMachines);
+    }
+
+    String varyoff(String node)
+    {
+        Machine m = machinesByName.get(node);
+        if ( m == null ) {
+            // ok, maybe it's already offline or maybe dead
+            // relatively rare, cleaner to search than to make yet another 
index
+
+            for ( Machine mm : offlineMachines.values() ) {
+                if ( mm.getId().equals(node) ) {
+                    return "VaryOff: Nodepool " + id + " - Already offline: " 
+ node;
+                }
+            }
+
+            Iterator<Machine> iter = unresponsiveMachines.values().iterator();
+            while ( iter.hasNext() ) {
+                Machine mm = iter.next();                
+                if ( mm.getId().equals(node) ) {
+                    Node key = mm.key();
+                    iter.remove();
+                    offlineMachines.put(key, mm);
+                    return "VaryOff: Nodepool " + id + " - Unresponsive 
machine, marked offline: " + node;
+                }
+            }
+
+            return "VaryOff: Nodepool " + id + " - Cannot find machine: " + 
node;
+        }
+
+        disable(m, offlineMachines);
+        return "VaryOff: " + node + " - OK.";
+    }
+
+    /**
+     * We're going to just take it off the offline list and if it happens to 
come back, fine, it will get picked up
+     * in nodeArrives as a new machine.
+     */
+    String varyon(String node)
+    {        
+        if ( machinesByName.containsKey(node) ) {
+            return "VaryOn: Nodepool " + id + " - Already online: " + node;
+        }
+
+        Iterator<Machine> iter = offlineMachines.values().iterator();
+        while ( iter.hasNext() ) {
+            Machine mm = iter.next();
+            if ( mm.getId().equals(node) ) {
+                iter.remove();
+                return "VaryOn: Nodepool " + id + " - Machine marked online: " 
+ node;
+            }
+        }
+
+        iter = unresponsiveMachines.values().iterator();
+        while ( iter.hasNext() ) {
+            Machine mm = iter.next();
+            if ( mm.getId().equals(node) ) {
+                return "VaryOn: Nodepool " + id + " - Machine is online but 
not responsive: " + node;
+            }
+        }
+        
+        return "VaryOn: Nodepool " + id + " - Cannot find machine: " + node;
+    }
+
     /**
      * 
------------------------------------------------------------------------------------------
      * Routines used during the counting phase
@@ -792,7 +865,7 @@ class NodePool
      */
 
     /**
-     * Adjust counts for something that takes full machiens, like a 
reservation.
+     * Adjust counts for something that takes full machines, like a 
reservation.
      * If "enforce" is set the machine order must match, otherwise we just do 
best effort to match.
      *
      * This is intended for use by reservations only; as such it does NOT 
recurse into child nodepools.
@@ -931,7 +1004,7 @@ class NodePool
     }
 
     /**
-     * We need to make enough space for 'cnt' full machines.  If enforce is 
true the machiens need
+     * We need to make enough space for 'cnt' full machines.  If enforce is 
true the machines need
      * to be of the indicated order; otherwise we just nuke any old thing.
      *
      * Returns number of machines that are freeable, up to 'needed', or 0, if 
we can't get enough.

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1591769&r1=1591768&r2=1591769&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java
 Thu May  1 20:43:26 2014
@@ -1523,12 +1523,16 @@ public class NodepoolScheduler
         for ( ResourceClass cl : resourceClasses.values() ) {
             if ( cl.getNodepoolName().equals(nodepool.getId()) && 
(cl.getAllJobs().size() > 0) ) {
                 HashMap<IRmJob, IRmJob> jobs = cl.getAllJobs();
+                String npn = cl.getNodepoolName();
+                logger.info(methodName, null, String.format("%12s %7s %7s %6s 
%5s", npn, "Counted", "Current", "Needed", "Order"));
+
                 for ( IRmJob j : jobs.values() ) {
                     int counted = j.countNSharesGiven();      // allotment 
from the counter
                     int current = j.countNShares();           // currently 
allocated, plus pending, less those removed by earlier preemption
                     int needed = (counted - current);
                     int order = j.getShareOrder();
          
+                    logger.info(methodName, j.getId(), String.format("%12s %7d 
%7d %6d %5d", npn, counted, current, needed, order));
                     needed = Math.abs(needed);
                     //needed = Math.max(0, needed);
                     neededByOrder[order] += needed;

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1591769&r1=1591768&r2=1591769&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
 Thu May  1 20:43:26 2014
@@ -80,6 +80,7 @@ public class Scheduler
     Map<Node, Node> deadNodes      = new HashMap<Node, Node>();           // 
missed too many heartbeats
     // HashMap<Node, Node> allNodes       = new HashMap<Node, Node>();         
  // the guys we know
     Map<String, NodePool>    nodepoolsByNode = new HashMap<String, 
NodePool>(); // all nodes, and their associated pool
+    Map<String, String>      shortToLongNode = new HashMap<String, String>();  
 // 
 
     Map<String, User>    users     = new HashMap<String, User>();         // 
Active users - has a job in the system
     //HashMap<DuccId, IRmJob>    runningJobs = new HashMap<DuccId, IRmJob>();
@@ -451,7 +452,7 @@ public class Scheduler
         if ( nodes == null ) return;
 
         for ( String s : nodes.keySet() ) {
-             nodepoolsByNode.put(s, pool);
+            updateNodepoolsByNode(s, pool);        // maps from both the 
fully-qualified name and th shortnmae
         }
     }
 
@@ -956,6 +957,27 @@ public class Scheduler
 //         }
 //     }
 
+
+    /**
+     * maps from both the fully-qualified name and th shortnmae
+     */
+    void updateNodepoolsByNode(String longname, NodePool np)
+    {
+       String methodName = "updateNodepoolsByNode";
+        String shortname = longname;
+        int ndx = longname.indexOf(".");
+
+        logger.info(methodName, null, "Map", longname, "to", np.getId());
+        nodepoolsByNode.put(longname, np);
+
+        if ( ndx >=0 ) {
+            shortname = longname.substring(0, ndx);
+            nodepoolsByNode.put(shortname, np);
+            shortToLongNode.put(shortname, longname);
+            logger.info(methodName, null, "Map", shortname, "to", np.getId());
+        }
+    }
+
     //
     // Return a nodepool by Node.  If the node can't be associated with a 
nodepool, return the
     // default nodepool, which is always the first one defined in the config 
file.
@@ -968,39 +990,38 @@ public class Scheduler
         }
         if ( np == null ) {
             np = nodepools[0];
-            nodepoolsByNode.put( ni.getName(), np);          // assign this 
guy to the default np
+            updateNodepoolsByNode(ni.getName(), np);     // assign this guy to 
the default np
+            // nodepoolsByNode.put( ni.getName(), np);          // assign this 
guy to the default np
         }
         return np;
     }
 
     private int total_arrivals = 0;
-    public void nodeArrives(Node node)
+    public synchronized void nodeArrives(Node node)
     {        
        // String methodName = "nodeArrives";
         // The first block insures the node is in the scheduler's records as 
soon as possible
 
         total_arrivals++;       // report these in the main schedule loop
-        synchronized(this) {
-            // the amount of memory available for shares, adjusted with 
configured overhead
-
-            NodePool np = getNodepoolByName(node.getNodeIdentity());
-            Machine m = np.getMachine(node);
-            int share_order = 0;
-
-            if ( m == null ) {
-                // allNodes.put(node, node);
-                long allocatable_mem =  
node.getNodeMetrics().getNodeMemory().getMemTotal() - share_free_dram;
-                if ( dramOverride > 0 ) {
-                    allocatable_mem = dramOverride;
-                }
-                share_order = (int) (allocatable_mem / share_quantum);         
  // conservative - rounds down (this will always cast ok)                
-            } else {
-                share_order = m.getShareOrder();
-            }
-            
-            max_order = Math.max(share_order, max_order);
-            m = np.nodeArrives(node, share_order);                         // 
announce to the nodepools
+        // the amount of memory available for shares, adjusted with configured 
overhead
+        
+        NodePool np = getNodepoolByName(node.getNodeIdentity());
+        Machine m = np.getMachine(node);
+        int share_order = 0;
+        
+        if ( m == null ) {
+            // allNodes.put(node, node);
+            long allocatable_mem =  
node.getNodeMetrics().getNodeMemory().getMemTotal() - share_free_dram;
+            if ( dramOverride > 0 ) {
+                allocatable_mem = dramOverride;
+            }
+            share_order = (int) (allocatable_mem / share_quantum);           
// conservative - rounds down (this will always cast ok)                
+        } else {
+            share_order = m.getShareOrder();
         }
+        
+        max_order = Math.max(share_order, max_order);
+        m = np.nodeArrives(node, share_order);                         // 
announce to the nodepools
     }
 
     public void nodeDeath(Map<Node, Node> nodes)
@@ -1010,6 +1031,50 @@ public class Scheduler
         }
     }
 
+    public synchronized String varyon(String[] nodes)
+    {
+        StringBuffer reply = new StringBuffer();
+        for (String n : nodes ) {
+
+            if ( shortToLongNode.containsKey(n) ) {         // internally 
everything is by 'long'
+                n = shortToLongNode.get(n);
+            }
+
+            NodePool np = nodepoolsByNode.get(n);
+            if ( np == null ) {
+                reply.append("No nodepool found for node ");
+                reply.append(n);
+                reply.append("\n");
+            } else {                
+                reply.append(np.varyon(n));
+                reply.append("\n");
+            }
+        }
+       return reply.toString();
+    }
+
+    public synchronized String varyoff(String[] nodes)
+    {
+        StringBuffer reply = new StringBuffer();
+        for (String n : nodes ) {
+
+            if ( shortToLongNode.containsKey(n) ) {         // internally 
everything is by 'long'
+                n = shortToLongNode.get(n);
+            }
+
+            NodePool np = nodepoolsByNode.get(n);
+            if ( np == null ) {
+                reply.append("No nodepool found for node ");
+                reply.append(n);
+                reply.append("\n");
+            } else {                
+                reply.append(np.varyoff(n));
+                reply.append("\n");
+            }
+        }
+       return reply.toString();
+    }
+
     /**
      * Callback from job manager, need shares for a new fair-share job.
      */


Reply via email to