Author: rwesten
Date: Sat Jun 30 11:55:46 2012
New Revision: 1355713

URL: http://svn.apache.org/viewvc?rev=1355713&view=rev
Log:
While working on STANBOL-669 small improvements:

* now an own Threads monitors active EnhancementJobs and logs information about 
those
* Invalid ExecutionPlans are now detected and EnhancementJobs with those are 
marked as failed and are closed.

Modified:
    
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
    
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java

Modified: 
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
URL: 
http://svn.apache.org/viewvc/incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java?rev=1355713&r1=1355712&r2=1355713&view=diff
==============================================================================
--- 
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
 (original)
+++ 
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJob.java
 Sat Jun 30 11:55:46 2012
@@ -420,7 +420,7 @@ public class EnhancementJob {
                     + " | chain.running " + running + ")!");
         }
         if (running.remove(executionNode)) {
-            log.info(
+            log.debug(
                 "Execution of '{}' for ContentItem {} completed "
                 + "(chain: {}, node: {}, optional {})",
                 new Object[] {engine, contentItem.getUri().getUnicodeString(), 
@@ -490,7 +490,7 @@ public class EnhancementJob {
                                        chain, executionNode, optional});
                 return;
             } else { //added an engine to running
-                log.info("Started Execution of '{}' for ContentItem {} "
+                log.debug("Started Execution of '{}' for ContentItem {} "
                          + "(chain: {}, node: {}, optional {})",
                     new Object[] {engine, 
contentItem.getUri().getUnicodeString(), chain,
                                   executionNode, optional});

Modified: 
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java?rev=1355713&r1=1355712&r2=1355713&view=diff
==============================================================================
--- 
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
 (original)
+++ 
incubator/stanbol/trunk/enhancer/jobmanager/event/src/main/java/org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.java
 Sat Jun 30 11:55:46 2012
@@ -23,10 +23,16 @@ import static org.apache.stanbol.enhance
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Hashtable;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -64,8 +70,9 @@ public class EnhancementJobHandler imple
      * requesting thread as soon as the enhancement process has finished. 
      */
     private Map<EnhancementJob,Object> processingJobs;
-    private final ReadWriteLock processingLock = new ReentrantReadWriteLock(); 
       
-
+    private final ReadWriteLock processingLock = new ReentrantReadWriteLock();
+    private Thread observerDaemon;
+    
     public EnhancementJobHandler(EventAdmin eventAdmin, 
                                  EnhancementEngineManager engineManager) {
         if(eventAdmin == null){
@@ -78,10 +85,15 @@ public class EnhancementJobHandler imple
         this.engineManager = engineManager;
         processingLock.writeLock().lock();
         try {
-            processingJobs = new HashMap<EnhancementJob,Object>();
+            processingJobs = new LinkedHashMap<EnhancementJob,Object>();
         } finally{
             processingLock.writeLock().unlock();
         }
+        observerDaemon = new Thread(new EnhancementJobObserver());
+        observerDaemon.setName("Event Job Manager Observer Daemon");
+        observerDaemon.setDaemon(true);
+        observerDaemon.start();
+        
     }
     /**
      * Closes this Handler and notifies all components that wait for still
@@ -100,6 +112,7 @@ public class EnhancementJobHandler imple
         } finally {
             processingLock.writeLock().unlock();
         }
+        observerDaemon = null;
     }
     
     /**
@@ -132,6 +145,7 @@ public class EnhancementJobHandler imple
             o = processingJobs.get(enhancementJob);
             if(o == null){
                 o = new Object();
+                logJobInfo(enhancementJob, "Add EnhancementJob:");
                 processingJobs.put(enhancementJob, o);
                 init = true;
             } else {
@@ -181,12 +195,18 @@ public class EnhancementJobHandler imple
             if(job.isFinished()){
                 finish(job);
             } else if(!job.isFailed()){
-                executeNextNodes(job);
+                if(!executeNextNodes(job) && job.getRunning().isEmpty()){
+                    log.warn("Unexpected state in the Execution of ContentItem 
{}:"
+                        + " Job is not finished AND no executions are running 
AND"
+                        + " no further execution could be started! -> 
finishing"
+                        + " this job :(");
+                    finish(job);
+                } //else execution started of other jobs are running
             } else {
                 if(log.isInfoEnabled()){
                     Collection<String> running = new ArrayList<String>(3);
                     for(NonLiteral runningNode : job.getRunning()){
-                        running.add(getEngine(job.getExecutionPlan(), 
runningNode));
+                        running.add(getEngine(job.getExecutionPlan(), 
job.getExecutionNode(runningNode)));
                     }
                     log.info("Job {} failed, but {} still running!",
                         job.getContentItem().getUri(),running);
@@ -271,6 +291,7 @@ public class EnhancementJobHandler imple
         }
         if(o != null) {
             synchronized (o) {
+                logJobInfo(job, "Finished EnhancementJob:");
                 log.debug("++ n: finished processing ContentItem {} with Chain 
{}",
                     job.getContentItem().getUri(),job.getChainName());
                 o.notifyAll();
@@ -286,9 +307,11 @@ public class EnhancementJobHandler imple
      * {@link EnhancementJob#isFinished()} notifies the one who registered 
      * the {@link EnhancementJob} with this component.
      * @param job the enhancement job to process
+     * @return if an Execution event was sent
      */
-    protected void executeNextNodes(EnhancementJob job) {
+    protected boolean executeNextNodes(EnhancementJob job) {
         //getExecutable returns an snapshot so we do not need to lock
+        boolean startedExecution = false;
         for(NonLiteral executable : job.getExecutable()){
             if(log.isDebugEnabled()){
                 log.debug("PREPARE execution of Engine {}",
@@ -303,7 +326,82 @@ public class EnhancementJobHandler imple
                     getEngine(job.getExecutionPlan(), 
job.getExecutionNode(executable)));
             }
             eventAdmin.postEvent(new Event(TOPIC_JOB_MANAGER,properties));
+            startedExecution = true;
         }
-    }    
+        return startedExecution;
+    }
+    
+    /**
+     * Logs basic infos about the Job as INFO and detailed infos as DEBUG
+     * @param job
+     */
+    protected void logJobInfo(EnhancementJob job, String header) {
+        if(header != null){
+            log.info(header);
+        }
+        log.info("   state: 
{}",job.isFinished()?"finished":job.isFailed()?"failed":"processing");
+        log.info("   chain: {}",job.getChainName());
+        log.info("   content-item: {}", job.getContentItem().getUri());
+        log.debug("   executions:");
+        if(log.isDebugEnabled()){
+            for(NonLiteral completedExec : job.getCompleted()){
+                log.info("    - {} 
completed",getEngine(job.getExecutionMetadata(), 
+                    job.getExecutionNode(completedExec)));
+            }
+            for(NonLiteral runningExec : job.getRunning()){
+                log.info("    - {} 
running",getEngine(job.getExecutionMetadata(), 
+                    job.getExecutionNode(runningExec)));
+            }
+        }
+    }
+    /**
+     * Currently only used to debug the number of currently registered
+     * Enhancements Jobs (if there are some)
+     * @author Rupert Westenthaler
+     */
+    private class EnhancementJobObserver implements Runnable {
+
+        @Override
+        public void run() {
+            log.debug(" ... init EnhancementJobObserver");
+            while(processingJobs != null){
+                try {
+                    Thread.sleep(10000);
+                } catch (InterruptedException e) {
+                }
+                Collection<EnhancementJob> jobs;
+                Lock readLock = processingLock.readLock();
+                readLock.lock();
+                try {
+                    if(processingJobs != null){
+                        jobs = new 
ArrayList<EnhancementJob>(processingJobs.keySet());
+                    } else {
+                        jobs = Collections.emptyList();
+                    }
+                } finally {
+                    readLock.unlock();
+                }
+                if(!jobs.isEmpty()){
+                    log.info(" -- {} active Enhancement Jobs",jobs.size());
+                    if(log.isDebugEnabled()){
+                        for(EnhancementJob job : jobs){
+                            Lock jobLock = job.getLock().readLock();
+                            jobLock.lock();
+                            try {
+                                logJobInfo(job,null);
+                            } finally {
+                                jobLock.unlock();
+                            }
+                        }
+                    }
+                } else {
+                    log.debug(" -- No active Enhancement Jobs");
+                }
+            }
+            
+        }
+        
+    }
+    
     
 }
\ No newline at end of file


Reply via email to