Author: bfoster
Date: Wed Jan 19 02:22:25 2011
New Revision: 1060652

URL: http://svn.apache.org/viewvc?rev=1060652&view=rev
Log:

- ability to map certain task types to certain runners . . . allows conditions 
and simpleTasks and the such to say be run locally instead of being sent to 
resource manager

Added:
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
   (with props)
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
   (with props)
Modified:
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
    
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
    
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml
    
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
 Wed Jan 19 02:22:25 2011
@@ -215,6 +215,7 @@ public class QueueManager {
                ti.setDynamicMetadata(taskProcessor.getDynamicMetadata());
                ti.setStaticMetadata(taskProcessor.getStaticMetadata());
                ti.setModelId(taskProcessor.getModelId());
+               ti.setExecutionType(taskProcessor.getExecutionType());
                if (taskProcessor.getJobId() == null) {
                        ti.setJobId(UUID.randomUUID().toString());
                        taskProcessor.setJobId(ti.getJobId());

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
 Wed Jan 19 02:22:25 2011
@@ -29,9 +29,9 @@ import org.apache.oodt.cas.workflow.inst
  */
 public abstract class EngineRunner {
        
-       public abstract int getOpenSlots() throws Exception;
+       public abstract int getOpenSlots(TaskInstance workflowInstance) throws 
Exception;
        
-       public abstract boolean hasOpenSlots() throws Exception;
+       public abstract boolean hasOpenSlots(TaskInstance workflowInstance) 
throws Exception;
        
        public abstract void execute(TaskInstance workflowInstance) throws 
Exception;
        

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
 Wed Jan 19 02:22:25 2011
@@ -30,9 +30,13 @@ import org.apache.oodt.cas.workflow.inst
  */
 public class LocalEngineRunner extends EngineRunner {
        
-       private static final int NUM_OF_SLOTS = 6;
+       private int numOfSlots;
        private int usedSlots = 0;
        
+       public LocalEngineRunner(int numOfSlots) {
+               this.numOfSlots = numOfSlots;
+       }
+       
        public void execute(final TaskInstance workflowInstance) throws 
Exception {
                incrSlots();
                new Thread(new Runnable() {
@@ -47,13 +51,13 @@ public class LocalEngineRunner extends E
        }
 
        @Override
-       public synchronized int getOpenSlots() throws Exception {
-               return NUM_OF_SLOTS - usedSlots;
+       public synchronized int getOpenSlots(TaskInstance workflowInstance) 
throws Exception {
+               return numOfSlots - usedSlots;
        }
 
        @Override
-       public boolean hasOpenSlots() throws Exception {
-               return this.getOpenSlots() > 0;
+       public boolean hasOpenSlots(TaskInstance workflowInstance) throws 
Exception {
+               return this.getOpenSlots(workflowInstance) > 0;
        }
        
        private synchronized void incrSlots() {

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
 Wed Jan 19 02:22:25 2011
@@ -27,8 +27,14 @@ package org.apache.oodt.cas.workflow.eng
  */
 public class LocalEngineRunnerFactory implements EngineRunnerFactory {
 
+       private int numOfSlots = 6;
+       
        public LocalEngineRunner createRunner() {
-               return new LocalEngineRunner();
+               return new LocalEngineRunner(this.numOfSlots);
+       }
+       
+       public void setNumOfSlots(int numOfSlots) {
+               this.numOfSlots = numOfSlots;
        }
 
 }

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java?rev=1060652&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
 Wed Jan 19 02:22:25 2011
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+//JDK imports
+import java.util.Map;
+
+/**
+ * 
+ * @author bfoster
+ *
+ */
+public class MappedMultiRunner extends EngineRunner {
+
+       protected static final String DEFAULT_RUNNER = "default";
+       
+       private Map<String, EngineRunner> runnerMap;
+       private Map<String, String> executionTypeToRunnerMap;
+       
+       public MappedMultiRunner(Map<String, EngineRunner> runnerMap, 
Map<String, String> executionTypeToRunnerMap) throws InstantiationException {
+               if (!runnerMap.containsKey(DEFAULT_RUNNER))
+                       throw new InstantiationException("Must set default 
runner key '" + DEFAULT_RUNNER + "' in runner map");
+               this.runnerMap = runnerMap;
+               this.executionTypeToRunnerMap = executionTypeToRunnerMap;
+       }
+       
+       @Override
+       public void execute(TaskInstance workflowInstance) throws Exception {
+               this.getRunner(workflowInstance).execute(workflowInstance);
+       }
+
+       @Override
+       public int getOpenSlots(TaskInstance workflowInstance) throws Exception 
{
+               return 
this.getRunner(workflowInstance).getOpenSlots(workflowInstance);
+       }
+
+       @Override
+       public boolean hasOpenSlots(TaskInstance workflowInstance) throws 
Exception {
+               return 
this.getRunner(workflowInstance).hasOpenSlots(workflowInstance);
+       }
+       
+       private EngineRunner getRunner(TaskInstance workflowInstance) {
+               String runnerId = 
this.executionTypeToRunnerMap.get(workflowInstance.getExecutionType());
+               if (runnerId == null)
+                       runnerId = DEFAULT_RUNNER;
+               return runnerMap.get(runnerId);
+       }
+       
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java?rev=1060652&view=auto
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
 (added)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
 Wed Jan 19 02:22:25 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * 
+ * @author bfoster
+ *
+ */
+public class MappedMultiRunnerFactory implements EngineRunnerFactory {
+
+       private static final Logger LOG = 
Logger.getLogger(MappedMultiRunnerFactory.class.getName());
+       
+       private Map<String, EngineRunnerFactory> runnerFactoryMap;
+       private Map<String, String> executionTypeToRunnerMap;
+       
+       public EngineRunner createRunner() {
+               try {
+                       HashMap<String, EngineRunner> runnerMap = new 
HashMap<String, EngineRunner>();
+                       for (Entry<String, EngineRunnerFactory> entry : 
this.runnerFactoryMap.entrySet())
+                               runnerMap.put(entry.getKey(), 
entry.getValue().createRunner());
+                       return new MappedMultiRunner(runnerMap, 
this.executionTypeToRunnerMap);
+               }catch (Exception e) {
+                       LOG.log(Level.SEVERE, "Failed to create instance of '" 
+ MappedMultiRunner.class.getCanonicalName() + "' : " + e.getMessage(), e);
+                       return null;
+               }
+       }
+
+       public void setRunnerFactoryMap(Map<String, EngineRunnerFactory> 
runnerFactoryMap) throws Exception {
+               this.runnerFactoryMap = runnerFactoryMap;
+       }
+       
+       public void setExecutionTypeToRunnerMap(Map<String, String> 
executionTypeToRunnerMap) {
+               this.executionTypeToRunnerMap = executionTypeToRunnerMap;
+       }
+
+}

Propchange: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
 Wed Jan 19 02:22:25 2011
@@ -65,13 +65,13 @@ public class ResourceRunner extends Engi
        /**
         * Additional '-1' is a workaround of a bug in resource manager
         */
-       public int getOpenSlots() throws Exception {
+       public int getOpenSlots(TaskInstance workflowInstance) throws Exception 
{
                return this.rsManagerClient.getJobQueueCapacity() - 
this.rsManagerClient.getJobQueueSize() - 1;
        }
 
        @Override
-       public boolean hasOpenSlots() throws Exception {
-               return this.getOpenSlots() > 0;
+       public boolean hasOpenSlots(TaskInstance workflowInstance) throws 
Exception {
+               return this.getOpenSlots(workflowInstance) > 0;
        }
 
 }

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
 Wed Jan 19 02:22:25 2011
@@ -57,6 +57,7 @@ public abstract class TaskInstance {
        private String jobId;
        private String instanceId;
        private String modelId;
+       private String executionType;
        private WorkflowEngineClient weClient;
        private Metadata dynamicMetadata;
        private Metadata staticMetadata;
@@ -86,6 +87,14 @@ public abstract class TaskInstance {
                return this.modelId;
        }
        
+       public void setExecutionType(String executionType) {
+               this.executionType = executionType;
+       }
+       
+       public String getExecutionType(){
+               return this.executionType;
+       }
+       
        public ProcessorInfo getProcessorInfo() throws EngineException {
                return this.weClient.getProcessorInfo(this.instanceId, 
this.modelId);
        }
@@ -153,6 +162,7 @@ public abstract class TaskInstance {
        ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.JOB_ID, this.jobId);
        ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.INSTANCE_ID, 
this.instanceId);
        ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.MODEL_ID, 
this.modelId);
+       ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.EXECUTION_TYPE, 
this.executionType);
        ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.STATE, 
workflowState.getName());
        ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.HOST, 
WorkflowUtils.getHostName());
        ProcessorInfo processorInfo = this.getProcessorInfo();

Modified: 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
 Wed Jan 19 02:22:25 2011
@@ -32,6 +32,8 @@ public interface WorkflowMetKeys {
        
        public static final String MODEL_ID = "ModelId";
 
+       public static final String EXECUTION_TYPE = "ExecutionType";
+
        public static final String STATE = "State";
 
        public static final String PRIORITY = "Priority";

Modified: 
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml 
(original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml 
Wed Jan 19 02:22:25 2011
@@ -124,10 +124,26 @@
     <bean id="DefaultServerFactory" lazy-init="true" 
parent="XmlRpcServerFactory"/>
     
     <!-- Runner Factories -->
-    <bean id="LocalRunnerFactory" lazy-init="true" 
class="org.apache.oodt.cas.workflow.engine.runner.LocalEngineRunnerFactory"/>
+    <bean id="LocalRunnerFactory" lazy-init="true" 
class="org.apache.oodt.cas.workflow.engine.runner.LocalEngineRunnerFactory">
+        <property name="numOfSlots" value="32"/>        
+    </bean>
     <bean id="ResourceRunnerFactory" lazy-init="true" 
class="org.apache.oodt.cas.workflow.engine.runner.ResourceRunnerFactory">
         <property name="resourceManagerUrl" value="${resourcemgr.url}"/>
     </bean>
+    <bean id="MappedMultiRunnerFactory" lazy-init="true" 
class="org.apache.oodt.cas.workflow.engine.runner.MappedMultiRunnerFactory">
+        <property name="runnerMap">
+            <map>
+                <entry key="default" value-ref="ResourceRunnerFactory"/>
+                <entry key="local" value-ref="LocalRunnerFactory"/>
+            </map>
+        </property>
+        <property name="executionTypeToRunnerMap">
+            <map>
+                <entry key="condition" value="local"/>
+                <entry key="simpleTask" value="local"/>
+            </map>
+        </property>            
+    </bean>
     
     <!-- Event Repositories -->
     <bean id="SpringBasedEngineEventRepositoryFactory" lazy-init="true" 
class="org.apache.oodt.cas.workflow.event.repo.SpringBasedEngineEventRepositoryFactory">

Modified: 
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml
URL: 
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
--- 
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml
 (original)
+++ 
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml
 Wed Jan 19 02:22:25 2011
@@ -16,6 +16,7 @@
        <processor id="task" 
class="org.apache.oodt.cas.workflow.processor.TaskProcessor"/>     
        
        <!-- custom --> 
+       <processor id="simpleTask" 
class="org.apache.oodt.cas.workflow.processor.TaskProcessor"/>       
        <processor id="sequential" 
class="org.apache.oodt.cas.workflow.processor.SequentialProcessor"/>
        <processor id="parallel" 
class="org.apache.oodt.cas.workflow.processor.ParallelProcessor"/>
 


Reply via email to