Author: rwesten
Date: Sat Jun 30 11:55:46 2012
New Revision: 1355713
URL: http://svn.apache.org/viewvc?rev=1355713&view=rev
Log:
While working on STANBOL-669 small improvements:
* now an own Threads monitors active EnhancementJobs and logs information about
those
* Invalid ExecutionPlans are now detected and EnhancementJobs with those are
marked as failed and are closed.
Modified:
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
Modified:
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
URL:
http://svn.apache.org/viewvc/incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java?rev=1355713&r1=1355712&r2=1355713&view=diff
==============================================================================
---
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
(original)
+++
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
Sat Jun 30 11:55:46 2012
@@ -420,7 +420,7 @@ public class EnhancementJob {
+ " | chain.running " + running + ")!");
}
if (running.remove(executionNode)) {
- log.info(
+ log.debug(
"Execution of '{}' for ContentItem {} completed "
+ "(chain: {}, node: {}, optional {})",
new Object[] {engine, contentItem.getUri().getUnicodeString(),
@@ -490,7 +490,7 @@ public class EnhancementJob {
chain, executionNode, optional});
return;
} else { //added an engine to running
- log.info("Started Execution of '{}' for ContentItem {} "
+ log.debug("Started Execution of '{}' for ContentItem {} "
+ "(chain: {}, node: {}, optional {})",
new Object[] {engine,
contentItem.getUri().getUnicodeString(), chain,
executionNode, optional});
Modified:
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
URL:
http://svn.apache.org/viewvc/incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java?rev=1355713&r1=1355712&r2=1355713&view=diff
==============================================================================
---
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
(original)
+++
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
Sat Jun 30 11:55:46 2012
@@ -23,10 +23,16 @@ import static org.apache.stanbol.enhance
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,8 +70,9 @@ public class EnhancementJobHandler imple
* requesting thread as soon as the enhancement process has finished.
*/
private Map<EnhancementJob,Object> processingJobs;
- private final ReadWriteLock processingLock = new ReentrantReadWriteLock();
-
+ private final ReadWriteLock processingLock = new ReentrantReadWriteLock();
+ private Thread observerDaemon;
+
public EnhancementJobHandler(EventAdmin eventAdmin,
EnhancementEngineManager engineManager) {
if(eventAdmin == null){
@@ -78,10 +85,15 @@ public class EnhancementJobHandler imple
this.engineManager = engineManager;
processingLock.writeLock().lock();
try {
- processingJobs = new HashMap<EnhancementJob,Object>();
+ processingJobs = new LinkedHashMap<EnhancementJob,Object>();
} finally{
processingLock.writeLock().unlock();
}
+ observerDaemon = new Thread(new EnhancementJobObserver());
+ observerDaemon.setName("Event Job Manager Observer Daemon");
+ observerDaemon.setDaemon(true);
+ observerDaemon.start();
+
}
/**
* Closes this Handler and notifies all components that wait for still
@@ -100,6 +112,7 @@ public class EnhancementJobHandler imple
} finally {
processingLock.writeLock().unlock();
}
+ observerDaemon = null;
}
/**
@@ -132,6 +145,7 @@ public class EnhancementJobHandler imple
o = processingJobs.get(enhancementJob);
if(o == null){
o = new Object();
+ logJobInfo(enhancementJob, "Add EnhancementJob:");
processingJobs.put(enhancementJob, o);
init = true;
} else {
@@ -181,12 +195,18 @@ public class EnhancementJobHandler imple
if(job.isFinished()){
finish(job);
} else if(!job.isFailed()){
- executeNextNodes(job);
+ if(!executeNextNodes(job) && job.getRunning().isEmpty()){
+ log.warn("Unexpected state in the Execution of ContentItem
{}:"
+ + " Job is not finished AND no executions are running
AND"
+ + " no further execution could be started! ->
finishing"
+ + " this job :(");
+ finish(job);
+ } //else execution started of other jobs are running
} else {
if(log.isInfoEnabled()){
Collection<String> running = new ArrayList<String>(3);
for(NonLiteral runningNode : job.getRunning()){
- running.add(getEngine(job.getExecutionPlan(),
runningNode));
+ running.add(getEngine(job.getExecutionPlan(),
job.getExecutionNode(runningNode)));
}
log.info("Job {} failed, but {} still running!",
job.getContentItem().getUri(),running);
@@ -271,6 +291,7 @@ public class EnhancementJobHandler imple
}
if(o != null) {
synchronized (o) {
+ logJobInfo(job, "Finished EnhancementJob:");
log.debug("++ n: finished processing ContentItem {} with Chain
{}",
job.getContentItem().getUri(),job.getChainName());
o.notifyAll();
@@ -286,9 +307,11 @@ public class EnhancementJobHandler imple
* {@link EnhancementJob#isFinished()} notifies the one who registered
* the {@link EnhancementJob} with this component.
* @param job the enhancement job to process
+ * @return if an Execution event was sent
*/
- protected void executeNextNodes(EnhancementJob job) {
+ protected boolean executeNextNodes(EnhancementJob job) {
//getExecutable returns an snapshot so we do not need to lock
+ boolean startedExecution = false;
for(NonLiteral executable : job.getExecutable()){
if(log.isDebugEnabled()){
log.debug("PREPARE execution of Engine {}",
@@ -303,7 +326,82 @@ public class EnhancementJobHandler imple
getEngine(job.getExecutionPlan(),
job.getExecutionNode(executable)));
}
eventAdmin.postEvent(new Event(TOPIC_JOB_MANAGER,properties));
+ startedExecution = true;
}
- }
+ return startedExecution;
+ }
+
+ /**
+ * Logs basic infos about the Job as INFO and detailed infos as DEBUG
+ * @param job
+ */
+ protected void logJobInfo(EnhancementJob job, String header) {
+ if(header != null){
+ log.info(header);
+ }
+ log.info(" state:
{}",job.isFinished()?"finished":job.isFailed()?"failed":"processing");
+ log.info(" chain: {}",job.getChainName());
+ log.info(" content-item: {}", job.getContentItem().getUri());
+ log.debug(" executions:");
+ if(log.isDebugEnabled()){
+ for(NonLiteral completedExec : job.getCompleted()){
+ log.info(" - {}
completed",getEngine(job.getExecutionMetadata(),
+ job.getExecutionNode(completedExec)));
+ }
+ for(NonLiteral runningExec : job.getRunning()){
+ log.info(" - {}
running",getEngine(job.getExecutionMetadata(),
+ job.getExecutionNode(runningExec)));
+ }
+ }
+ }
+ /**
+ * Currently only used to debug the number of currently registered
+ * Enhancements Jobs (if there are some)
+ * @author Rupert Westenthaler
+ */
+ private class EnhancementJobObserver implements Runnable {
+
+ @Override
+ public void run() {
+ log.debug(" ... init EnhancementJobObserver");
+ while(processingJobs != null){
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+ Collection<EnhancementJob> jobs;
+ Lock readLock = processingLock.readLock();
+ readLock.lock();
+ try {
+ if(processingJobs != null){
+ jobs = new
ArrayList<EnhancementJob>(processingJobs.keySet());
+ } else {
+ jobs = Collections.emptyList();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ if(!jobs.isEmpty()){
+ log.info(" -- {} active Enhancement Jobs",jobs.size());
+ if(log.isDebugEnabled()){
+ for(EnhancementJob job : jobs){
+ Lock jobLock = job.getLock().readLock();
+ jobLock.lock();
+ try {
+ logJobInfo(job,null);
+ } finally {
+ jobLock.unlock();
+ }
+ }
+ }
+ } else {
+ log.debug(" -- No active Enhancement Jobs");
+ }
+ }
+
+ }
+
+ }
+
}
\ No newline at end of file