[
https://issues.apache.org/jira/browse/OODT-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16585417#comment-16585417
]
ASF GitHub Bot commented on OODT-992:
-------------------------------------
chrismattmann closed pull request #70: [OODT-992] Modified
AvroRPCWorkflowManager to accept a Map<String, Object> when creating dynamic
workflows
URL: https://github.com/apache/oodt/pull/70
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/pom.xml b/core/pom.xml
index 49486c76b..69bf75272 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -517,19 +517,16 @@ the License.
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>${jetty.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-management</artifactId>
<version>${jetty.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>${jetty.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.python</groupId>
diff --git
a/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/CASProductHandler.java
b/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/CASProductHandler.java
index 00ef50fc6..d930d8f2d 100644
---
a/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/CASProductHandler.java
+++
b/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/CASProductHandler.java
@@ -18,7 +18,6 @@
package org.apache.oodt.cas.product;
-//JDK imports
import org.apache.oodt.cas.filemgr.structs.Product;
import org.apache.oodt.cas.filemgr.structs.Reference;
@@ -49,8 +48,6 @@
import static
org.apache.oodt.cas.product.CASProductHandlerMetKeys.CAS_PROFILE_ID;
import static org.apache.oodt.cas.product.CASProductHandlerMetKeys.FILE_HEADER;
-//OODT imports
-
/**
* @author mattmann
* @version $Revision$
diff --git
a/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/data/DatasetDeliveryServlet.java
b/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/data/DatasetDeliveryServlet.java
index 9685e36d3..d04232c46 100644
---
a/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/data/DatasetDeliveryServlet.java
+++
b/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/data/DatasetDeliveryServlet.java
@@ -51,8 +51,7 @@
* @author mattmann
* @version $Revision$
*/
-public class DatasetDeliveryServlet extends HttpServlet implements
- DataDeliveryKeys {
+public class DatasetDeliveryServlet extends HttpServlet implements
DataDeliveryKeys {
/* our log stream */
private static final Logger LOG = Logger
@@ -266,6 +265,14 @@ public void init(ServletConfig config) throws
ServletException {
}
+ public void destroy() {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException ignored) { }
+ }
+ }
+
private boolean alreadyZipped(Product p, Map hash) {
return hash.containsKey(p.getProductName());
}
diff --git
a/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/rdf/RDFDatasetServlet.java
b/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/rdf/RDFDatasetServlet.java
index e15faf4f8..bf4df9d41 100644
---
a/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/rdf/RDFDatasetServlet.java
+++
b/webapp/fmprod/src/main/java/org/apache/oodt/cas/product/rdf/RDFDatasetServlet.java
@@ -18,7 +18,7 @@
package org.apache.oodt.cas.product.rdf;
-//JDK imports
+
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
@@ -32,15 +32,6 @@
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.List;
-import java.util.Vector;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -53,8 +44,14 @@
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
-
-//OODT imports
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.List;
+import java.util.Vector;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
*
@@ -266,4 +263,13 @@ public void outputRDF(List<ProductType> productTypes,
String base, HttpServletRe
return types;
}
+ public void destroy() {
+ super.destroy();
+
+ if (fClient != null) {
+ try {
+ fClient.close();
+ } catch (IOException ignored) { }
+ }
+ }
}
diff --git a/workflow/pom.xml b/workflow/pom.xml
index 99f12df19..55259ddac 100644
--- a/workflow/pom.xml
+++ b/workflow/pom.xml
@@ -162,6 +162,18 @@ the License.
<groupId>xmlrpc</groupId>
<artifactId>xmlrpc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-management</artifactId>
+ </dependency>
<!-- Logging -->
<dependency>
diff --git a/workflow/src/main/avro/types/AvroWorkflowInstance.avsc
b/workflow/src/main/avro/types/AvroWorkflowInstance.avsc
index 9987cf117..38221c35c 100644
--- a/workflow/src/main/avro/types/AvroWorkflowInstance.avsc
+++ b/workflow/src/main/avro/types/AvroWorkflowInstance.avsc
@@ -13,7 +13,7 @@
{"name":"endDateTimeIsoStr","type":["null","string"]},
{"name":"currentTaskStartDateTimeIsoStr","type":["null","string"]},
{"name":"currentTaskEndDateTimeIsoStr","type":["null","string"]},
- {"name":"sharedContext","type":{"type":"map","values":"string"}},
+
{"name":"sharedContext","type":{"type":"map","values":["string",{"type":"array","items":"string"}]}},
{"name":"priority","type":["null","double"]}
]
}
\ No newline at end of file
diff --git a/workflow/src/main/avro/types/protocol.avdl
b/workflow/src/main/avro/types/protocol.avdl
index 780b6f2f4..536163017 100644
--- a/workflow/src/main/avro/types/protocol.avdl
+++ b/workflow/src/main/avro/types/protocol.avdl
@@ -8,7 +8,7 @@ import schema "AvroWorkflowInstancePage.avsc";
boolean refreshRepository();
- string executeDynamicWorkflow(array<string> taskIds, map<string> metadata);
+ string executeDynamicWorkflow(array<string> taskIds, map<union {string,
array<string>}> metadata);
array<string> getRegisteredEvents();
@@ -26,9 +26,9 @@ import schema "AvroWorkflowInstancePage.avsc";
array<AvroWorkflow> getWorkflowsByEvent(string eventName);
- map<string> getWorkflowInstanceMetadata(string wInstId);
+ map<union {string, array<string>}> getWorkflowInstanceMetadata(string
wInstId);
- boolean handleEvent(string eventName, map<string> metadata);
+ boolean handleEvent(string eventName, map<union {string, array<string>}>
metadata);
AvroWorkflowInstance getWorkflowInstanceById(string wInstId);
@@ -59,7 +59,7 @@ import schema "AvroWorkflowInstancePage.avsc";
AvroWorkflow getWorkflowById(string workflowId);
boolean updateMetadataForWorkflow(
- string workflowInstId, map<string> metadata);
+ string workflowInstId, map<union {string, array<string>}>
metadata);
boolean updateWorkflowInstance(AvroWorkflowInstance instance);
diff --git
a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManager.java
b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManager.java
index 4bfcfcf2e..96a46e6bb 100644
---
a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManager.java
+++
b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManager.java
@@ -19,7 +19,7 @@
import com.google.common.base.Preconditions;
import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.oodt.cas.metadata.Metadata;
@@ -48,7 +48,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
@@ -100,13 +99,25 @@ public AvroRpcWorkflowManager(int port){
engine.setWorkflowManagerUrl(safeGetUrlFromString("http://" +
getHostname() + ":" + port));
repo = getWorkflowRepositoryFromProperty();
- logger.debug("Starting Netty Server");
+ logger.debug("Starting Http Server...");
// start up the server
- server = new NettyServer(new SpecificResponder(
-
org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class,this),
- new InetSocketAddress(port));
+ try {
+ server = new HttpServer(new SpecificResponder(
+
org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class,this),
port);
+ } catch (IOException e) {
+ logger.error("Unable to create http server on port: {}", e);
+ throw new IllegalStateException("Unable to start http server on
port: " + port, e);
+ }
+
logger.debug("Server created. Starting ...");
server.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ });
logger.info("Workflow Manager started by {} for url: {}",
System.getProperty("user.name", "unknown"),
workflowManagerUrl);
@@ -131,7 +142,7 @@ public boolean refreshRepository() throws
AvroRemoteException {
}
@Override
- public String executeDynamicWorkflow(List<String> taskIds, Map<String,
String> metadata) throws AvroRemoteException {
+ public String executeDynamicWorkflow(List<String> taskIds, Map<String,
Object> metadata) throws AvroRemoteException {
logger.debug("Executing dynamic workflow with task IDs: {}", taskIds);
try {
if (taskIds == null || taskIds.size() == 0){
@@ -155,7 +166,7 @@ public String executeDynamicWorkflow(List<String> taskIds,
Map<String, String> m
Metadata met = new Metadata();
met.addMetadata(AvroTypeFactory.getMetadata(metadata));
- logger.debug("Created dynamic workflow[{}] for task IDs: {}",
dynamicWorkflow.getName(), taskIds);
+ logger.info("Created dynamic workflow[{}] for task IDs: {}",
dynamicWorkflow.getName(), taskIds);
WorkflowInstance inst = this.engine.startWorkflow(dynamicWorkflow,
met);
return inst.getId();
}catch (RepositoryException | EngineException e){
@@ -282,14 +293,14 @@ public AvroWorkflowInstancePage
paginateWorkflowInstances(int pageNum) throws Av
}
@Override
- public Map<String, String> getWorkflowInstanceMetadata(String wInstId)
throws AvroRemoteException {
+ public Map<String, Object> getWorkflowInstanceMetadata(String wInstId)
throws AvroRemoteException {
Metadata met = engine.getWorkflowInstanceMetadata(wInstId);
return AvroTypeFactory.getAvroMetadata(met);
}
@Override
- public boolean handleEvent(String eventName, Map<String, String> metadata)
throws AvroRemoteException {
+ public boolean handleEvent(String eventName, Map<String, Object> metadata)
throws AvroRemoteException {
logger.info("Received event: {}", eventName);
logger.debug("Reveiced meta data for event: {} -> {}", eventName,
metadata);
@@ -549,7 +560,7 @@ public AvroWorkflow getWorkflowById(String workflowId)
throws AvroRemoteExceptio
}
@Override
- public synchronized boolean updateMetadataForWorkflow(String
workflowInstId, Map<String, String> metadata) throws AvroRemoteException {
+ public synchronized boolean updateMetadataForWorkflow(String
workflowInstId, Map<String, Object> metadata) throws AvroRemoteException {
Metadata met = new Metadata();
met.addMetadata(AvroTypeFactory.getMetadata(metadata));
return this.engine.updateMetadata(workflowInstId, met);
diff --git
a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
index 01fe1e035..ac765f4ee 100644
---
a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
+++
b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/AvroRpcWorkflowManagerClient.java
@@ -17,6 +17,7 @@
package org.apache.oodt.cas.workflow.system;
+import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
@@ -53,7 +54,7 @@
public AvroRpcWorkflowManagerClient(URL url){
workflowManagerUrl = url;
try {
- client = new NettyTransceiver(new
InetSocketAddress(url.getHost(),url.getPort()));
+ client = new HttpTransceiver(url);
proxy =
SpecificRequestor.getClient(org.apache.oodt.cas.workflow.struct.avrotypes.WorkflowManager.class,
client);
} catch (IOException e) {
logger.error("Error occurred when creating client for: {}", url,
e);
diff --git
a/workflow/src/main/java/org/apache/oodt/cas/workflow/util/AvroTypeFactory.java
b/workflow/src/main/java/org/apache/oodt/cas/workflow/util/AvroTypeFactory.java
index eb264378d..62a553c38 100644
---
a/workflow/src/main/java/org/apache/oodt/cas/workflow/util/AvroTypeFactory.java
+++
b/workflow/src/main/java/org/apache/oodt/cas/workflow/util/AvroTypeFactory.java
@@ -224,11 +224,11 @@ public static AvroWorkflowInstance
getAvroWorkflowInstance(WorkflowInstance work
return avroWorkflowInstance;
}
- public static Map<String,String> getAvroMetadata(Metadata metadata){
- Map<String,String> avroMetadata = new HashMap<String, String>();
- if(metadata.getHashTable().size() > 0)
- for (String key : metadata.getAllKeys()){
- avroMetadata.put(key,metadata.getMetadata(key));
+ public static Map<String, Object> getAvroMetadata(Metadata metadata) {
+ Map<String, Object> avroMetadata = new HashMap<>();
+ if (metadata.getHashTable().size() > 0)
+ for (String key : metadata.getAllKeys()) {
+ avroMetadata.put(key, metadata.getAllMetadata(key));
}
return avroMetadata;
}
@@ -251,11 +251,15 @@ public static WorkflowInstance
getWorkflowInstance(AvroWorkflowInstance avroWork
return workflowInstance;
}
- public static Metadata getMetadata(Map<String,String> avroMetadata){
+ public static Metadata getMetadata(Map<String, Object> avroMetadata) {
Metadata metadata = new Metadata();
if (avroMetadata.size() > 0)
- for (String key : avroMetadata.keySet()){
- metadata.addMetadata(key,avroMetadata.get(key));
+ for (String key : avroMetadata.keySet()) {
+ if (avroMetadata.get(key) instanceof List) {
+ metadata.addMetadata(key, (List) avroMetadata.get(key));
+ } else {
+ metadata.addMetadata(key, (String) avroMetadata.get(key));
+ }
}
return metadata;
}
diff --git a/workflow/src/main/resources/log4j2.xml
b/workflow/src/main/resources/log4j2.xml
index ac00ff149..786880bcc 100644
--- a/workflow/src/main/resources/log4j2.xml
+++ b/workflow/src/main/resources/log4j2.xml
@@ -30,6 +30,7 @@
<Loggers>
<Logger name="org.springframework" level="error"/>
+ <Logger name="org.mortbay" level="error"/>
<Root level="debug">
<AppenderRef ref="Console"/>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> AvroRPC WorkflowManager executeDynamicWorkflow() accepts a Map<String,String>
> -----------------------------------------------------------------------------
>
> Key: OODT-992
> URL: https://issues.apache.org/jira/browse/OODT-992
> Project: OODT
> Issue Type: Bug
> Components: workflow manager
> Affects Versions: 1.2, 1.9
> Reporter: Imesha Sudasingha
> Assignee: Chris A. Mattmann
> Priority: Major
>
> AvroRPC Workflow manager's *executeDynamicWorkflow()* accepts a Map<String,
> String> which should be accepting Map<String, Object> where the value can be
> either a String or a List<String>.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)