Author: bfoster
Date: Wed Jan 19 02:22:25 2011
New Revision: 1060652
URL: http://svn.apache.org/viewvc?rev=1060652&view=rev
Log:
- ability to map certain task types to certain runners . . . allows conditions
and simpleTasks and the such to say be run locally instead of being sent to
resource manager
Added:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
(with props)
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
(with props)
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/queue/QueueManager.java
Wed Jan 19 02:22:25 2011
@@ -215,6 +215,7 @@ public class QueueManager {
ti.setDynamicMetadata(taskProcessor.getDynamicMetadata());
ti.setStaticMetadata(taskProcessor.getStaticMetadata());
ti.setModelId(taskProcessor.getModelId());
+ ti.setExecutionType(taskProcessor.getExecutionType());
if (taskProcessor.getJobId() == null) {
ti.setJobId(UUID.randomUUID().toString());
taskProcessor.setJobId(ti.getJobId());
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/EngineRunner.java
Wed Jan 19 02:22:25 2011
@@ -29,9 +29,9 @@ import org.apache.oodt.cas.workflow.inst
*/
public abstract class EngineRunner {
- public abstract int getOpenSlots() throws Exception;
+ public abstract int getOpenSlots(TaskInstance workflowInstance) throws
Exception;
- public abstract boolean hasOpenSlots() throws Exception;
+ public abstract boolean hasOpenSlots(TaskInstance workflowInstance)
throws Exception;
public abstract void execute(TaskInstance workflowInstance) throws
Exception;
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunner.java
Wed Jan 19 02:22:25 2011
@@ -30,9 +30,13 @@ import org.apache.oodt.cas.workflow.inst
*/
public class LocalEngineRunner extends EngineRunner {
- private static final int NUM_OF_SLOTS = 6;
+ private int numOfSlots;
private int usedSlots = 0;
+ public LocalEngineRunner(int numOfSlots) {
+ this.numOfSlots = numOfSlots;
+ }
+
public void execute(final TaskInstance workflowInstance) throws
Exception {
incrSlots();
new Thread(new Runnable() {
@@ -47,13 +51,13 @@ public class LocalEngineRunner extends E
}
@Override
- public synchronized int getOpenSlots() throws Exception {
- return NUM_OF_SLOTS - usedSlots;
+ public synchronized int getOpenSlots(TaskInstance workflowInstance)
throws Exception {
+ return numOfSlots - usedSlots;
}
@Override
- public boolean hasOpenSlots() throws Exception {
- return this.getOpenSlots() > 0;
+ public boolean hasOpenSlots(TaskInstance workflowInstance) throws
Exception {
+ return this.getOpenSlots(workflowInstance) > 0;
}
private synchronized void incrSlots() {
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/LocalEngineRunnerFactory.java
Wed Jan 19 02:22:25 2011
@@ -27,8 +27,14 @@ package org.apache.oodt.cas.workflow.eng
*/
public class LocalEngineRunnerFactory implements EngineRunnerFactory {
+ private int numOfSlots = 6;
+
public LocalEngineRunner createRunner() {
- return new LocalEngineRunner();
+ return new LocalEngineRunner(this.numOfSlots);
+ }
+
+ public void setNumOfSlots(int numOfSlots) {
+ this.numOfSlots = numOfSlots;
}
}
Added:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java?rev=1060652&view=auto
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
(added)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
Wed Jan 19 02:22:25 2011
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//OODT imports
+import org.apache.oodt.cas.workflow.instance.TaskInstance;
+
+//JDK imports
+import java.util.Map;
+
+/**
+ *
+ * @author bfoster
+ *
+ */
+public class MappedMultiRunner extends EngineRunner {
+
+ protected static final String DEFAULT_RUNNER = "default";
+
+ private Map<String, EngineRunner> runnerMap;
+ private Map<String, String> executionTypeToRunnerMap;
+
+ public MappedMultiRunner(Map<String, EngineRunner> runnerMap,
Map<String, String> executionTypeToRunnerMap) throws InstantiationException {
+ if (!runnerMap.containsKey(DEFAULT_RUNNER))
+ throw new InstantiationException("Must set default
runner key '" + DEFAULT_RUNNER + "' in runner map");
+ this.runnerMap = runnerMap;
+ this.executionTypeToRunnerMap = executionTypeToRunnerMap;
+ }
+
+ @Override
+ public void execute(TaskInstance workflowInstance) throws Exception {
+ this.getRunner(workflowInstance).execute(workflowInstance);
+ }
+
+ @Override
+ public int getOpenSlots(TaskInstance workflowInstance) throws Exception
{
+ return
this.getRunner(workflowInstance).getOpenSlots(workflowInstance);
+ }
+
+ @Override
+ public boolean hasOpenSlots(TaskInstance workflowInstance) throws
Exception {
+ return
this.getRunner(workflowInstance).hasOpenSlots(workflowInstance);
+ }
+
+ private EngineRunner getRunner(TaskInstance workflowInstance) {
+ String runnerId =
this.executionTypeToRunnerMap.get(workflowInstance.getExecutionType());
+ if (runnerId == null)
+ runnerId = DEFAULT_RUNNER;
+ return runnerMap.get(runnerId);
+ }
+
+}
Propchange:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java?rev=1060652&view=auto
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
(added)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
Wed Jan 19 02:22:25 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.workflow.engine.runner;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ *
+ * @author bfoster
+ *
+ */
+public class MappedMultiRunnerFactory implements EngineRunnerFactory {
+
+ private static final Logger LOG =
Logger.getLogger(MappedMultiRunnerFactory.class.getName());
+
+ private Map<String, EngineRunnerFactory> runnerFactoryMap;
+ private Map<String, String> executionTypeToRunnerMap;
+
+ public EngineRunner createRunner() {
+ try {
+ HashMap<String, EngineRunner> runnerMap = new
HashMap<String, EngineRunner>();
+ for (Entry<String, EngineRunnerFactory> entry :
this.runnerFactoryMap.entrySet())
+ runnerMap.put(entry.getKey(),
entry.getValue().createRunner());
+ return new MappedMultiRunner(runnerMap,
this.executionTypeToRunnerMap);
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to create instance of '"
+ MappedMultiRunner.class.getCanonicalName() + "' : " + e.getMessage(), e);
+ return null;
+ }
+ }
+
+ public void setRunnerFactoryMap(Map<String, EngineRunnerFactory>
runnerFactoryMap) throws Exception {
+ this.runnerFactoryMap = runnerFactoryMap;
+ }
+
+ public void setExecutionTypeToRunnerMap(Map<String, String>
executionTypeToRunnerMap) {
+ this.executionTypeToRunnerMap = executionTypeToRunnerMap;
+ }
+
+}
Propchange:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/MappedMultiRunnerFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/engine/runner/ResourceRunner.java
Wed Jan 19 02:22:25 2011
@@ -65,13 +65,13 @@ public class ResourceRunner extends Engi
/**
* Additional '-1' is a workaround of a bug in resource manager
*/
- public int getOpenSlots() throws Exception {
+ public int getOpenSlots(TaskInstance workflowInstance) throws Exception
{
return this.rsManagerClient.getJobQueueCapacity() -
this.rsManagerClient.getJobQueueSize() - 1;
}
@Override
- public boolean hasOpenSlots() throws Exception {
- return this.getOpenSlots() > 0;
+ public boolean hasOpenSlots(TaskInstance workflowInstance) throws
Exception {
+ return this.getOpenSlots(workflowInstance) > 0;
}
}
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/instance/TaskInstance.java
Wed Jan 19 02:22:25 2011
@@ -57,6 +57,7 @@ public abstract class TaskInstance {
private String jobId;
private String instanceId;
private String modelId;
+ private String executionType;
private WorkflowEngineClient weClient;
private Metadata dynamicMetadata;
private Metadata staticMetadata;
@@ -86,6 +87,14 @@ public abstract class TaskInstance {
return this.modelId;
}
+ public void setExecutionType(String executionType) {
+ this.executionType = executionType;
+ }
+
+ public String getExecutionType(){
+ return this.executionType;
+ }
+
public ProcessorInfo getProcessorInfo() throws EngineException {
return this.weClient.getProcessorInfo(this.instanceId,
this.modelId);
}
@@ -153,6 +162,7 @@ public abstract class TaskInstance {
ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.JOB_ID, this.jobId);
ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.INSTANCE_ID,
this.instanceId);
ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.MODEL_ID,
this.modelId);
+ ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.EXECUTION_TYPE,
this.executionType);
ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.STATE,
workflowState.getName());
ctrlMetadata.replaceLocalMetadata(WorkflowMetKeys.HOST,
WorkflowUtils.getHostName());
ProcessorInfo processorInfo = this.getProcessorInfo();
Modified:
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/java/org/apache/oodt/cas/workflow/metadata/WorkflowMetKeys.java
Wed Jan 19 02:22:25 2011
@@ -32,6 +32,8 @@ public interface WorkflowMetKeys {
public static final String MODEL_ID = "ModelId";
+ public static final String EXECUTION_TYPE = "ExecutionType";
+
public static final String STATE = "State";
public static final String PRIORITY = "Priority";
Modified:
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/resources/policy/engine-beans.xml
Wed Jan 19 02:22:25 2011
@@ -124,10 +124,26 @@
<bean id="DefaultServerFactory" lazy-init="true"
parent="XmlRpcServerFactory"/>
<!-- Runner Factories -->
- <bean id="LocalRunnerFactory" lazy-init="true"
class="org.apache.oodt.cas.workflow.engine.runner.LocalEngineRunnerFactory"/>
+ <bean id="LocalRunnerFactory" lazy-init="true"
class="org.apache.oodt.cas.workflow.engine.runner.LocalEngineRunnerFactory">
+ <property name="numOfSlots" value="32"/>
+ </bean>
<bean id="ResourceRunnerFactory" lazy-init="true"
class="org.apache.oodt.cas.workflow.engine.runner.ResourceRunnerFactory">
<property name="resourceManagerUrl" value="${resourcemgr.url}"/>
</bean>
+ <bean id="MappedMultiRunnerFactory" lazy-init="true"
class="org.apache.oodt.cas.workflow.engine.runner.MappedMultiRunnerFactory">
+ <property name="runnerMap">
+ <map>
+ <entry key="default" value-ref="ResourceRunnerFactory"/>
+ <entry key="local" value-ref="LocalRunnerFactory"/>
+ </map>
+ </property>
+ <property name="executionTypeToRunnerMap">
+ <map>
+ <entry key="condition" value="local"/>
+ <entry key="simpleTask" value="local"/>
+ </map>
+ </property>
+ </bean>
<!-- Event Repositories -->
<bean id="SpringBasedEngineEventRepositoryFactory" lazy-init="true"
class="org.apache.oodt.cas.workflow.event.repo.SpringBasedEngineEventRepositoryFactory">
Modified:
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml
URL:
http://svn.apache.org/viewvc/oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml?rev=1060652&r1=1060651&r2=1060652&view=diff
==============================================================================
---
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml
(original)
+++
oodt/branches/wengine-branch/wengine/src/main/resources/policy/workflows/WorkflowProcessorMapping.xml
Wed Jan 19 02:22:25 2011
@@ -16,6 +16,7 @@
<processor id="task"
class="org.apache.oodt.cas.workflow.processor.TaskProcessor"/>
<!-- custom -->
+ <processor id="simpleTask"
class="org.apache.oodt.cas.workflow.processor.TaskProcessor"/>
<processor id="sequential"
class="org.apache.oodt.cas.workflow.processor.SequentialProcessor"/>
<processor id="parallel"
class="org.apache.oodt.cas.workflow.processor.ParallelProcessor"/>