Author: challngr Date: Thu May 1 20:43:26 2014 New Revision: 1591769 URL: http://svn.apache.org/r1591769 Log: UIMA-3615 Initial vary-on vary-off support.
Added: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off (with props) uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on (with props) uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Added: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off?rev=1591769&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off (added) +++ uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off Thu May 1 20:43:26 2014 @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# ----------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ----------------------------------------------------------------------- + + +import os +import sys + +from ducc_util import DuccUtil + +class DuccVaryOff(DuccUtil): + + + def usage(self, msg): + if ( msg != None ): + print msg + + print 'vary_off list-of-nodes' + print ' This marks the nodes named in the list offline for scheduling purposes.' + print ' All jobs, services, and managed reservations are canceled on the nodes.' + print '' + print 'Example:' + print ' vary_off node1 node2 node3' + + sys.exit(1) + + def main(self, nodes): + + if ( len(nodes) == 0 ): + self.usage(None) + + print 'Varying off', nodes + DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties " + DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME + DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') + self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccRmAdmin', '--varyoff', ' '.join(nodes)) + + return + +if __name__ == "__main__": + stopper = DuccVaryOff() + stopper.main(sys.argv[1:]) + + Propchange: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_off ------------------------------------------------------------------------------ svn:executable = * Added: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on?rev=1591769&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on (added) +++ uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on Thu May 1 20:43:26 2014 @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# ----------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ----------------------------------------------------------------------- + + +import os +import sys + +from ducc_util import DuccUtil + +class DuccVaryOff(DuccUtil): + + def usage(self, msg): + if ( msg != None ): + print msg + + print 'vary_on list-of-nodes' + print ' This marks the nodes named in the list online for scheduling purposes.' + print ' The nodes must have been previously online and/or configured in a nodellist' + print ' at boot time.' + print '' + print 'Example:' + print ' vary_on node1 node2 node3' + + def main(self, nodes): + + if ( len(nodes) == 0 ): + self.usage(None) + + print 'Varying on', nodes + DUCC_JVM_OPTS = ' -Dducc.deploy.configuration=' + self.DUCC_HOME + "/resources/ducc.properties " + DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -DDUCC_HOME=' + self.DUCC_HOME + DUCC_JVM_OPTS = DUCC_JVM_OPTS + ' -Dducc.head=' + self.ducc_properties.get('ducc.head') + + self.spawn(self.java(), DUCC_JVM_OPTS, 'org.apache.uima.ducc.common.main.DuccRmAdmin', '--varyon', ' '.join(nodes)) + + return + +if __name__ == "__main__": + stopper = DuccVaryOff() + stopper.main(sys.argv[1:]) + + Propchange: uima/sandbox/uima-ducc/trunk/src/main/admin/vary_on ------------------------------------------------------------------------------ svn:executable = * Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java?rev=1591769&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminReply.java Thu May 1 20:43:26 2014 @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.common.admin.event; + +public class RmAdminReply + extends DuccAdminEvent +{ + private static final long serialVersionUID = -8101741014979144426L; + String response; + + public RmAdminReply(String response) + { + this.response = response; + } + + public String getResponse() + { + return response; + } +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java?rev=1591769&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOff.java Thu May 1 20:43:26 2014 @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.common.admin.event; + +public class RmAdminVaryOff + extends DuccAdminEvent +{ + private static final long serialVersionUID = -8101741014979144426L; + String[] nodes; + + public RmAdminVaryOff(String[] nodes) + { + this.nodes = nodes; + } + + public String[] getNodes() { return nodes;} +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java?rev=1591769&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/admin/event/RmAdminVaryOn.java Thu May 1 20:43:26 2014 @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.common.admin.event; + +public class RmAdminVaryOn + extends DuccAdminEvent +{ + private static final long serialVersionUID = -8101741014979144426L; + String[] nodes; + + public RmAdminVaryOn(String[] nodes) + { + this.nodes = nodes; + } + + public String[] getNodes() { return nodes;} +} Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java?rev=1591769&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/main/DuccRmAdmin.java Thu May 1 20:43:26 2014 @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ +package org.apache.uima.ducc.common.main; + +import java.io.FileNotFoundException; + +import org.apache.activemq.camel.component.ActiveMQComponent; +import org.apache.camel.CamelContext; +import org.apache.camel.ExchangePattern; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.dataformat.xstream.XStreamDataFormat; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.DefaultClassResolver; +import org.apache.uima.ducc.common.admin.event.DuccAdminEvent; +import org.apache.uima.ducc.common.admin.event.RmAdminReply; +import org.apache.uima.ducc.common.admin.event.RmAdminVaryOff; +import org.apache.uima.ducc.common.admin.event.RmAdminVaryOn; +import org.apache.uima.ducc.common.authentication.BrokerCredentials; +import org.apache.uima.ducc.common.authentication.BrokerCredentials.Credentials; +import org.apache.uima.ducc.common.component.AbstractDuccComponent; +import org.apache.uima.ducc.common.exception.DuccRuntimeException; +import org.apache.uima.ducc.common.utils.Utils; + +import com.thoughtworks.xstream.XStream; +import com.thoughtworks.xstream.io.xml.DomDriver; + +public class DuccRmAdmin + extends AbstractDuccComponent +{ + public static final String FileSeparator = System + .getProperty("file.separator"); + + private String brokerUrl; + private ProducerTemplate pt; + private String targetEndpoint; + + public DuccRmAdmin(CamelContext context, String epname) + { + super("DuccServiceReaper", context); + try { + + // Load ducc properties file and enrich System properties. It supports + // overrides for entries in ducc properties file. Any key in the ducc + // property file can be overriden with -D<key>=<value> + loadProperties(DuccService.DUCC_PROPERTY_FILE); + + // fetch the broker URL from ducc.properties + this.brokerUrl = System.getProperty("ducc.broker.url"); + try { + String brokerCredentialsFile = System.getProperty("ducc.broker.credentials.file"); + // fetch the admin endpoint from the ducc.properties where + // the admin events will be sent by the DuccServiceReaper + targetEndpoint = System.getProperty(epname); + if ( targetEndpoint == null ) { + throw new IllegalArgumentException("Cannot find endpoint for RM admin. Is 'ducc.rm.admin.endpoint' configured n ducc.properties?"); + } + + // System.out.println("+++ Activating JMS Component for Endpoint:" + targetEndpoint + " Broker:" + brokerUrl); + + ActiveMQComponent duccAMQComponent = new ActiveMQComponent(context); + duccAMQComponent.setBrokerURL(brokerUrl); + + // context.addComponent("activemq", + // ActiveMQComponent.activeMQComponent(brokerUrl)); + + if ( brokerCredentialsFile != null && brokerCredentialsFile.length() > 0 ) { + String path =""; + try { + Utils.findDuccHome(); // add DUCC_HOME to System.properties + path = Utils.resolvePlaceholderIfExists(brokerCredentialsFile, System.getProperties()); + Credentials credentials = BrokerCredentials.get(path); + if ( credentials.getUsername() != null && credentials.getPassword() != null ) { + duccAMQComponent.setUserName(credentials.getUsername()); + duccAMQComponent.setPassword(credentials.getPassword()); + } + } catch(FileNotFoundException e) { + System.out.println("DuccRmAdmin Failed - File Not Found:"+path+" Broker Credentials File:"+brokerCredentialsFile); + System.exit(-1); + } + } + context.addComponent("activemq",duccAMQComponent); + this.pt = context.createProducerTemplate(); + } catch( Throwable exx) { + System.out.println("DuccRmAdmin Failed:"+exx); + System.exit(-1); + } + + } catch (Exception e) { + e.printStackTrace(); + System.exit(-1); + } + } + + private String marshallEvent(DuccAdminEvent duccEvent) + throws Exception + { + XStreamDataFormat xStreamDataFormat = new XStreamDataFormat(); + XStream xStream = xStreamDataFormat.getXStream(new DefaultClassResolver()); + return xStream.toXML(duccEvent); + } + + private RmAdminReply unmarshallEvent(Object targetToUnmarshall) + throws Exception + { + XStream xStream = new XStream(new DomDriver()); + String claz = targetToUnmarshall.getClass().getName(); + + if (targetToUnmarshall instanceof byte[]) { + Object reply = xStream.fromXML(new String((byte[]) targetToUnmarshall)); + if (reply instanceof RmAdminReply) { + return (RmAdminReply) reply; + } else { + claz = (reply == null) ? "NULL" : reply.getClass().getName(); + } + } + throw new Exception( "Unexpected Reply type received from Ducc Component. Expected DuccEvent, instead received: " + claz); + } + + public RmAdminReply dispatchAndWaitForReply(DuccAdminEvent duccEvent) + throws Exception + { + int maxRetryCount = 20; + int i = 0; + Object reply = null; + RuntimeExchangeException ree = null; + + // retry up to 20 times. This is an attempt to handle an error thrown + // by Camel: Failed to resolve replyTo destination on the exchange + // Camel waits at most 10000ms( 10secs) for AMQ to create a temp queue. + // After 10secs Camel times out and throws an Exception. + for (; i < maxRetryCount; i++) { + try { + reply = pt.sendBody(targetEndpoint, ExchangePattern.InOut, marshallEvent(duccEvent)); + ree = null; // all is well - got a reply + break; // done here + + } catch (RuntimeExchangeException e) { + String msg = e.getMessage(); + // Only retry if AMQ failed to create a temp queue + if (msg != null && msg.startsWith("Failed to resolve replyTo destination on the exchange")) { + ree = e; + } else { + throw new DuccRuntimeException("Ducc JMS Dispatcher is unable to deliver a request.", e); + } + } + } + // when retries hit the threshold, just throw an exception + if (i == maxRetryCount) { + throw new DuccRuntimeException("ActiveMQ failed to create temp reply queue. After 20 attempts to deliver request to the OR, Ducc JMS Dispatcher is giving up.", + ree); + } + if ( reply instanceof RmAdminReply ) { + return (RmAdminReply) reply; + } else { + throw new DuccRuntimeException("Received unexpected object as response: " + reply.getClass().getName()); + } + } + + /** + * Interprets and executes Admin command + * + * @throws Exception + */ + public void varyoff(String[] args) + throws Exception + { + String[] nodes = new String[args.length - 1]; + for ( int i = 1; i < args.length; i++) nodes[i-1] = args[i]; // take a slice of the array + + RmAdminVaryOff vo = new RmAdminVaryOff(nodes); + RmAdminReply reply = dispatchAndWaitForReply(vo); + System.out.println(reply.getResponse()); + } + + /** + * Interprets and executes Admin command + * + * @throws Exception + */ + public void varyon(String[] args) + throws Exception + { + String[] nodes = new String[args.length - 1]; + for ( int i = 1; i < args.length; i++) nodes[i-1] = args[i]; // take a slice of the array + + RmAdminVaryOn vo = new RmAdminVaryOn(nodes); + RmAdminReply reply = dispatchAndWaitForReply(vo); + System.out.println(reply.getResponse()); + } + + + public void run(String[] args) + throws Exception + { + if ( args[0].equals("--varyoff")) { + if ( args.length < 2 ) usage("Missing node list"); + varyoff(args); + return; + } + + if ( args[0].equals("--varyon")) { + if ( args.length < 2 ) usage("Missing node list"); + varyon(args); + return; + } + } + + public static void usage(String msg) + { + if ( msg != null ) System.out.println(msg); + + System.out.println("Usage:\n"); + System.out.println("DuccRmAdmin verb options"); + System.out.println("Where verbs are:"); + System.out.println(" --varyoff string-delimeted-nodes"); + System.out.println(" --varyon string-delimeted-nodes"); + + System.exit(1); + } + + public static void main(String[] args) + { + try { + DuccRmAdmin admin = new DuccRmAdmin(new DefaultCamelContext(), "ducc.rm.admin.endpoint"); + admin.run(args); + } catch (Throwable e) { + e.printStackTrace(); + } finally { + System.exit(-1); + } + } + +} Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java?rev=1591769&r1=1591768&r2=1591769&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/NodeStability.java Thu May 1 20:43:26 2014 @@ -65,9 +65,12 @@ public class NodeStability logger.warn(methodName, null, "Ignoring node update, scheduler is still booting."); return; } else { - // logger.info(methodName, null, n.getNodeIdentity().getName()); - scheduler.nodeArrives(n); // tell RM - super.nodeArrives(n); // tell heartbeat monitor + try { + scheduler.nodeArrives(n); // tell RM + super.nodeArrives(n); // tell heartbeat monitor + } catch ( Throwable t ) { + logger.error(methodName, null, t); + } } } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java?rev=1591769&r1=1591768&r2=1591769&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java Thu May 1 20:43:26 2014 @@ -22,6 +22,13 @@ import java.util.Timer; import java.util.TimerTask; import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.uima.ducc.common.admin.event.DuccAdminEvent; +import org.apache.uima.ducc.common.admin.event.RmAdminReply; +import org.apache.uima.ducc.common.admin.event.RmAdminVaryOff; +import org.apache.uima.ducc.common.admin.event.RmAdminVaryOn; import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties; import org.apache.uima.ducc.common.boot.DuccDaemonRuntimeProperties.DaemonName; import org.apache.uima.ducc.common.component.AbstractDuccComponent; @@ -83,9 +90,69 @@ public class ResourceManagerComponent public DuccLogger getLogger() { return logger; } + + + /** + * Creates Camel Router for Ducc RM admin events. + * + * @param endpoint + * - ducc admin endpoint + * @param delegate + * - who to call when admin event arrives + * @throws Exception + */ + private void startRmAdminChannel(final String endpoint, final AbstractDuccComponent delegate) + throws Exception + { + getContext().addRoutes(new RouteBuilder() { + public void configure() { + System.out.println("Configuring RM Admin Channel on Endpoint:" + endpoint); + onException(Exception.class).handled(true).process(new ErrorProcessor()); + + from(endpoint).routeId("RMAdminRoute").unmarshal().xstream() + .process(new RmAdminEventProcessor(delegate)); + } + }); + + if (logger != null) { + logger.info("startRMAdminChannel", null, "Admin Channel Activated on endpoint:" + endpoint); + } + } + + class RmAdminEventProcessor implements Processor + { + final AbstractDuccComponent delegate; + + public RmAdminEventProcessor(final AbstractDuccComponent delegate) + { + this.delegate = delegate; + } + + public void process(final Exchange exchange) + throws Exception + { + String methodName = "RmAdminEventProcessor.process"; + Object body = exchange.getIn().getBody(); + logger.info(methodName, null, "Received Admin Message of Type:", body.getClass().getName()); + + DuccAdminEvent reply = null; + if (body instanceof RmAdminVaryOff) { + RmAdminVaryOff vo = (RmAdminVaryOff) body; + reply = new RmAdminReply(scheduler.varyoff(vo.getNodes())); + } else + if (body instanceof RmAdminVaryOn) { + RmAdminVaryOn vo = (RmAdminVaryOn) body; + reply = new RmAdminReply(scheduler.varyon(vo.getNodes())); + } + + exchange.getIn().setBody(reply); + } + } + public void start(DuccService service, String[] args) throws Exception { + String methodName = "start"; converter = new JobManagerConverter(scheduler, stabilityManager); super.start(service, args); @@ -96,7 +163,13 @@ public class ResourceManagerComponent nodeMetricsUpdateRate = SystemPropertyResolver.getIntProperty("ducc.agent.node.metrics.publish.rate", DEFAULT_NODE_METRICS_RATE); schedulingRatio = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", DEFAULT_SCHEDULING_RATIO); schedulingEpoch = SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.rate", DEFAULT_SCHEDULING_RATE); - + + String adminEndpoint = System.getProperty("ducc.rm.admin.endpoint"); + if ( adminEndpoint == null ) { + logger.warn(methodName, null, "No admin endpoint configured. Not starting admin channel."); + } else { + startRmAdminChannel(adminEndpoint, this); + } scheduler.init(); @@ -235,5 +308,4 @@ public class ResourceManagerComponent } } - } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1591769&r1=1591768&r2=1591769&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java Thu May 1 20:43:26 2014 @@ -70,4 +70,7 @@ public interface ISchedulerMain // once both initialized() and ready() occur, the RM scaffolding will enable scheduling by calling start void start(); + + String varyoff(String[] nodes); + String varyon(String[] nodes); } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1591769&r1=1591768&r2=1591769&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java Thu May 1 20:43:26 2014 @@ -51,6 +51,7 @@ class NodePool HashMap<Node, Machine> allMachines = new HashMap<Node, Machine>(); // all active machines in the system HashMap<Node, Machine> unresponsiveMachines = new HashMap<Node, Machine>(); // machines with excessive missed heartbeats + HashMap<Node, Machine> offlineMachines = new HashMap<Node, Machine>(); HashMap<Integer, HashMap<Node, Machine>> machinesByOrder = new HashMap<Integer, HashMap<Node, Machine>>(); // Schedulable, not necessarily fee HashMap<String, Machine> machinesByName = new HashMap<String, Machine>(); // by name, for nodepool support HashMap<String, Machine> machinesByIp = new HashMap<String, Machine>(); // by IP, for nodepool support @@ -696,6 +697,12 @@ class NodePool return m; } + if ( offlineMachines.containsKey(node) ) { // if it's offline it can't be restored like this. + Machine m = offlineMachines.get(node); + logger.trace(methodName, null, "Node ", m.getId(), " is offline, not activating."); + return m; + } + if ( unresponsiveMachines.containsKey(node) ) { // reactive the node Machine m = unresponsiveMachines.remove(node); @@ -740,7 +747,7 @@ class NodePool return machine; } - void nodeLeaves(Machine m) + void disable(Machine m, HashMap<Node, Machine> disableMap) { String methodName = "nodeLeaves"; @@ -758,18 +765,18 @@ class NodePool if ( j.getDuccType() == DuccType.Reservation ) { // UIMA-3614. Only actual reservation is left intact - logger.info(methodName, j.getId(), "Not purging job on dead node, job type:", j.getDuccType()); + logger.info(methodName, j.getId(), "Not purging job on dead/offline node, job type:", j.getDuccType()); break; } - logger.info(methodName, j.getId(), "Purging job on dead node, job type:", j.getDuccType()); + logger.info(methodName, j.getId(), "Purging job on dead/offline node, job type:", j.getDuccType()); j.shrinkByOne(s); nPendingByOrder[order]++; s.purge(); // This bet tells OR not to wait for confirmation from the agent } allMachines.remove(m.key()); - unresponsiveMachines.put(m.key(), m); + disableMap.put(m.key(), m); HashMap<Node, Machine> machs = machinesByOrder.get(order); machs.remove(m.key()); @@ -785,6 +792,72 @@ class NodePool } } + void nodeLeaves(Machine m) + { + disable(m, unresponsiveMachines); + } + + String varyoff(String node) + { + Machine m = machinesByName.get(node); + if ( m == null ) { + // ok, maybe it's already offline or maybe dead + // relatively rare, cleaner to search than to make yet another index + + for ( Machine mm : offlineMachines.values() ) { + if ( mm.getId().equals(node) ) { + return "VaryOff: Nodepool " + id + " - Already offline: " + node; + } + } + + Iterator<Machine> iter = unresponsiveMachines.values().iterator(); + while ( iter.hasNext() ) { + Machine mm = iter.next(); + if ( mm.getId().equals(node) ) { + Node key = mm.key(); + iter.remove(); + offlineMachines.put(key, mm); + return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node; + } + } + + return "VaryOff: Nodepool " + id + " - Cannot find machine: " + node; + } + + disable(m, offlineMachines); + return "VaryOff: " + node + " - OK."; + } + + /** + * We're going to just take it off the offline list and if it happens to come back, fine, it will get picked up + * in nodeArrives as a new machine. + */ + String varyon(String node) + { + if ( machinesByName.containsKey(node) ) { + return "VaryOn: Nodepool " + id + " - Already online: " + node; + } + + Iterator<Machine> iter = offlineMachines.values().iterator(); + while ( iter.hasNext() ) { + Machine mm = iter.next(); + if ( mm.getId().equals(node) ) { + iter.remove(); + return "VaryOn: Nodepool " + id + " - Machine marked online: " + node; + } + } + + iter = unresponsiveMachines.values().iterator(); + while ( iter.hasNext() ) { + Machine mm = iter.next(); + if ( mm.getId().equals(node) ) { + return "VaryOn: Nodepool " + id + " - Machine is online but not responsive: " + node; + } + } + + return "VaryOn: Nodepool " + id + " - Cannot find machine: " + node; + } + /** * ------------------------------------------------------------------------------------------ * Routines used during the counting phase @@ -792,7 +865,7 @@ class NodePool */ /** - * Adjust counts for something that takes full machiens, like a reservation. + * Adjust counts for something that takes full machines, like a reservation. * If "enforce" is set the machine order must match, otherwise we just do best effort to match. * * This is intended for use by reservations only; as such it does NOT recurse into child nodepools. @@ -931,7 +1004,7 @@ class NodePool } /** - * We need to make enough space for 'cnt' full machines. If enforce is true the machiens need + * We need to make enough space for 'cnt' full machines. If enforce is true the machines need * to be of the indicated order; otherwise we just nuke any old thing. * * Returns number of machines that are freeable, up to 'needed', or 0, if we can't get enough. Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java?rev=1591769&r1=1591768&r2=1591769&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodepoolScheduler.java Thu May 1 20:43:26 2014 @@ -1523,12 +1523,16 @@ public class NodepoolScheduler for ( ResourceClass cl : resourceClasses.values() ) { if ( cl.getNodepoolName().equals(nodepool.getId()) && (cl.getAllJobs().size() > 0) ) { HashMap<IRmJob, IRmJob> jobs = cl.getAllJobs(); + String npn = cl.getNodepoolName(); + logger.info(methodName, null, String.format("%12s %7s %7s %6s %5s", npn, "Counted", "Current", "Needed", "Order")); + for ( IRmJob j : jobs.values() ) { int counted = j.countNSharesGiven(); // allotment from the counter int current = j.countNShares(); // currently allocated, plus pending, less those removed by earlier preemption int needed = (counted - current); int order = j.getShareOrder(); + logger.info(methodName, j.getId(), String.format("%12s %7d %7d %6d %5d", npn, counted, current, needed, order)); needed = Math.abs(needed); //needed = Math.max(0, needed); neededByOrder[order] += needed; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1591769&r1=1591768&r2=1591769&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Thu May 1 20:43:26 2014 @@ -80,6 +80,7 @@ public class Scheduler Map<Node, Node> deadNodes = new HashMap<Node, Node>(); // missed too many heartbeats // HashMap<Node, Node> allNodes = new HashMap<Node, Node>(); // the guys we know Map<String, NodePool> nodepoolsByNode = new HashMap<String, NodePool>(); // all nodes, and their associated pool + Map<String, String> shortToLongNode = new HashMap<String, String>(); // Map<String, User> users = new HashMap<String, User>(); // Active users - has a job in the system //HashMap<DuccId, IRmJob> runningJobs = new HashMap<DuccId, IRmJob>(); @@ -451,7 +452,7 @@ public class Scheduler if ( nodes == null ) return; for ( String s : nodes.keySet() ) { - nodepoolsByNode.put(s, pool); + updateNodepoolsByNode(s, pool); // maps from both the fully-qualified name and th shortnmae } } @@ -956,6 +957,27 @@ public class Scheduler // } // } + + /** + * maps from both the fully-qualified name and th shortnmae + */ + void updateNodepoolsByNode(String longname, NodePool np) + { + String methodName = "updateNodepoolsByNode"; + String shortname = longname; + int ndx = longname.indexOf("."); + + logger.info(methodName, null, "Map", longname, "to", np.getId()); + nodepoolsByNode.put(longname, np); + + if ( ndx >=0 ) { + shortname = longname.substring(0, ndx); + nodepoolsByNode.put(shortname, np); + shortToLongNode.put(shortname, longname); + logger.info(methodName, null, "Map", shortname, "to", np.getId()); + } + } + // // Return a nodepool by Node. If the node can't be associated with a nodepool, return the // default nodepool, which is always the first one defined in the config file. @@ -968,39 +990,38 @@ public class Scheduler } if ( np == null ) { np = nodepools[0]; - nodepoolsByNode.put( ni.getName(), np); // assign this guy to the default np + updateNodepoolsByNode(ni.getName(), np); // assign this guy to the default np + // nodepoolsByNode.put( ni.getName(), np); // assign this guy to the default np } return np; } private int total_arrivals = 0; - public void nodeArrives(Node node) + public synchronized void nodeArrives(Node node) { // String methodName = "nodeArrives"; // The first block insures the node is in the scheduler's records as soon as possible total_arrivals++; // report these in the main schedule loop - synchronized(this) { - // the amount of memory available for shares, adjusted with configured overhead - - NodePool np = getNodepoolByName(node.getNodeIdentity()); - Machine m = np.getMachine(node); - int share_order = 0; - - if ( m == null ) { - // allNodes.put(node, node); - long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemTotal() - share_free_dram; - if ( dramOverride > 0 ) { - allocatable_mem = dramOverride; - } - share_order = (int) (allocatable_mem / share_quantum); // conservative - rounds down (this will always cast ok) - } else { - share_order = m.getShareOrder(); - } - - max_order = Math.max(share_order, max_order); - m = np.nodeArrives(node, share_order); // announce to the nodepools + // the amount of memory available for shares, adjusted with configured overhead + + NodePool np = getNodepoolByName(node.getNodeIdentity()); + Machine m = np.getMachine(node); + int share_order = 0; + + if ( m == null ) { + // allNodes.put(node, node); + long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemTotal() - share_free_dram; + if ( dramOverride > 0 ) { + allocatable_mem = dramOverride; + } + share_order = (int) (allocatable_mem / share_quantum); // conservative - rounds down (this will always cast ok) + } else { + share_order = m.getShareOrder(); } + + max_order = Math.max(share_order, max_order); + m = np.nodeArrives(node, share_order); // announce to the nodepools } public void nodeDeath(Map<Node, Node> nodes) @@ -1010,6 +1031,50 @@ public class Scheduler } } + public synchronized String varyon(String[] nodes) + { + StringBuffer reply = new StringBuffer(); + for (String n : nodes ) { + + if ( shortToLongNode.containsKey(n) ) { // internally everything is by 'long' + n = shortToLongNode.get(n); + } + + NodePool np = nodepoolsByNode.get(n); + if ( np == null ) { + reply.append("No nodepool found for node "); + reply.append(n); + reply.append("\n"); + } else { + reply.append(np.varyon(n)); + reply.append("\n"); + } + } + return reply.toString(); + } + + public synchronized String varyoff(String[] nodes) + { + StringBuffer reply = new StringBuffer(); + for (String n : nodes ) { + + if ( shortToLongNode.containsKey(n) ) { // internally everything is by 'long' + n = shortToLongNode.get(n); + } + + NodePool np = nodepoolsByNode.get(n); + if ( np == null ) { + reply.append("No nodepool found for node "); + reply.append(n); + reply.append("\n"); + } else { + reply.append(np.varyoff(n)); + reply.append("\n"); + } + } + return reply.toString(); + } + /** * Callback from job manager, need shares for a new fair-share job. */