Author: cwiklik
Date: Mon Aug  3 17:29:10 2015
New Revision: 1693942

URL: http://svn.apache.org/r1693942
Log:
UIMA-4540 Moved inventory publication beyond logging

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java?rev=1693942&r1=1693941&r2=1693942&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-agent/src/main/java/org/apache/uima/ducc/agent/processors/DefaultNodeInventoryProcessor.java
 Mon Aug  3 17:29:10 2015
@@ -15,7 +15,7 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
-*/
+ */
 package org.apache.uima.ducc.agent.processors;
 
 import java.util.ArrayList;
@@ -38,178 +38,217 @@ import org.apache.uima.ducc.transport.ev
  */
 public class DefaultNodeInventoryProcessor implements NodeInventoryProcessor {
        DuccLogger logger = new DuccLogger(this.getClass(), "AGENT");
-       boolean inventoryChanged=true;
+       boolean inventoryChanged = true;
        private NodeAgent agent;
-       private HashMap<DuccId, IDuccProcess> previousInventory;        
-       private int forceInventoryUpdateMaxThreshold=0;
-       private long counter=0;
-       
-       public  DefaultNodeInventoryProcessor(NodeAgent agent,String 
inventoryPublishRateSkipCount) {
+       private HashMap<DuccId, IDuccProcess> previousInventory;
+       private int forceInventoryUpdateMaxThreshold = 0;
+       private long counter = 0;
+
+       public DefaultNodeInventoryProcessor(NodeAgent agent,
+                       String inventoryPublishRateSkipCount) {
                this.agent = agent;
                try {
-                       forceInventoryUpdateMaxThreshold = 
Integer.parseInt(inventoryPublishRateSkipCount);
-               } catch( Exception e) {
+                       forceInventoryUpdateMaxThreshold = Integer
+                                       
.parseInt(inventoryPublishRateSkipCount);
+               } catch (Exception e) {
                }
-               //  Dont allow 0
-               if ( forceInventoryUpdateMaxThreshold == 0 ) {
-                 forceInventoryUpdateMaxThreshold = 1;
+               // Dont allow 0
+               if (forceInventoryUpdateMaxThreshold == 0) {
+                       forceInventoryUpdateMaxThreshold = 1;
                }
        }
+
        /**
         * Get a copy of agent {@code Process} inventory
         */
        public HashMap<DuccId, IDuccProcess> getInventory() {
                return agent.getInventoryCopy();
        }
+
        /**
         * 
         */
        public void process(Exchange outgoingMessage) throws Exception {
-               String methodName="process";
-               //      Get a deep copy of agent's inventory
+               String methodName = "process";
+               // Get a deep copy of agent's inventory
                HashMap<DuccId, IDuccProcess> inventory = getInventory();
                // Determine if the inventory changed since the last publishing 
was done
-               // First check if the inventory expanded or shrunk. If the same 
in size, 
+               // First check if the inventory expanded or shrunk. If the same 
in size,
                // compare process states and PID. If either of the two changed 
for any
-               // of the processes trigger immediate publish. If no changes 
found, publish
-               // according to skip counter 
(ducc.agent.node.inventory.publish.rate.skip)
+               // of the processes trigger immediate publish. If no changes 
found,
+               // publish
+               // according to skip counter
+               // (ducc.agent.node.inventory.publish.rate.skip)
                // configured in ducc.properties.
-               if ( previousInventory != null ) {
+               if (previousInventory != null) {
                        if (inventory.size() != previousInventory.size()) {
                                inventoryChanged = true;
                        } else {
-                               // Inventory maps are equal in size, check if 
all processes in the current
-                               // inventory exist in the previous inventory 
snapshot. If not, it means that
-                               // that perhaps a new process was added and one 
was removed. In this case,
+                               // Inventory maps are equal in size, check if 
all processes in
+                               // the current
+                               // inventory exist in the previous inventory 
snapshot. If not,
+                               // it means that
+                               // that perhaps a new process was added and one 
was removed. In
+                               // this case,
                                // force the publish, since there was a change.
-                               for( Map.Entry<DuccId, IDuccProcess> 
currentProcess: inventory.entrySet()) {
-                                       //      Check if a process in the 
current inventory exists in a previous 
-                                       //  inventory snapshot
-                                       if ( 
previousInventory.containsKey(currentProcess.getKey())) {
-                                               IDuccProcess previousProcess = 
-                                                               
previousInventory.get(currentProcess.getKey());
-                                               //      check if either PID or 
process state has changed
-                                               if ( 
currentProcess.getValue().getPID() != null && 
-                                                               
previousProcess.getPID() == null) {
+                               for (Map.Entry<DuccId, IDuccProcess> 
currentProcess : inventory
+                                               .entrySet()) {
+                                       // Check if a process in the current 
inventory exists in a
+                                       // previous
+                                       // inventory snapshot
+                                       if 
(previousInventory.containsKey(currentProcess.getKey())) {
+                                               IDuccProcess previousProcess = 
previousInventory
+                                                               
.get(currentProcess.getKey());
+                                               // check if either PID or 
process state has changed
+                                               if 
(currentProcess.getValue().getPID() != null
+                                                               && 
previousProcess.getPID() == null) {
                                                        inventoryChanged = true;
                                                        break;
-                                               } else if ( 
!currentProcess.getValue().getProcessState().equals(previousProcess.getProcessState()))
 {
+                                               } else if 
(!currentProcess.getValue().getProcessState()
+                                                               
.equals(previousProcess.getProcessState())) {
                                                        inventoryChanged = true;
                                                        break;
                                                } else {
-                                                       
List<IUimaPipelineAEComponent> breakdown = 
-                                                                       
currentProcess.getValue().getUimaPipelineComponents();
-                                                       if ( breakdown != null 
&& breakdown.size() > 0 ) { 
-                                                               
List<IUimaPipelineAEComponent> previousBreakdown = 
-                                                                               
previousProcess.getUimaPipelineComponents();
-                                                               if ( 
previousBreakdown == null || previousBreakdown.size() == 0 || 
-                                                                               
breakdown.size() != previousBreakdown.size()) {
+                                                       
List<IUimaPipelineAEComponent> breakdown = currentProcess
+                                                                       
.getValue().getUimaPipelineComponents();
+                                                       if (breakdown != null 
&& breakdown.size() > 0) {
+                                                               
List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
+                                                                               
.getUimaPipelineComponents();
+                                                               if 
(previousBreakdown == null
+                                                                               
|| previousBreakdown.size() == 0
+                                                                               
|| breakdown.size() != previousBreakdown
+                                                                               
                .size()) {
                                                                        
inventoryChanged = true;
                                                                } else {
-                                                                       for 
(IUimaPipelineAEComponent uimaAeState : breakdown ) {
-                                      boolean found = false;
-                                                                         for 
(IUimaPipelineAEComponent previousUimaAeState : previousBreakdown ) {
-                                                                           if 
( uimaAeState.getAeName().equals(previousUimaAeState.getAeName()) ) {
-                                                                               
  found = true;
-                                                                               
  if ( !uimaAeState.getAeState().equals(previousUimaAeState.getAeState()) ||
-                                                                               
           uimaAeState.getInitializationTime() != 
previousUimaAeState.getInitializationTime() ) {
-                                                                               
    inventoryChanged = true;
+                                                                       for 
(IUimaPipelineAEComponent uimaAeState : breakdown) {
+                                                                               
boolean found = false;
+                                                                               
for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
+                                                                               
        if (uimaAeState.getAeName().equals(
+                                                                               
                        previousUimaAeState
+                                                                               
                                        .getAeName())) {
+                                                                               
                found = true;
+                                                                               
                if (!uimaAeState
+                                                                               
                                .getAeState()
+                                                                               
                                .equals(previousUimaAeState
+                                                                               
                                                .getAeState())
+                                                                               
                                || uimaAeState
+                                                                               
                                                .getInitializationTime() != 
previousUimaAeState
+                                                                               
                                                .getInitializationTime()) {
+                                                                               
                        inventoryChanged = true;
+                                                                               
                        break;
+                                                                               
                }
+                                                                               
        }
+                                                                               
}
+                                                                               
if (!found) {
+                                                                               
        inventoryChanged = true;
+                                                                               
}
+
+                                                                               
if (inventoryChanged) {
                                                                                
        break;
-                                                                               
  }
-                                                                           }
-                                                                         }
-                                                                         if ( 
!found ) {
-                                                                               
inventoryChanged = true;
-                                                                         }
-                                                                               
  
-                                                                         if ( 
inventoryChanged ) {
-                                                                               
break;
-                                                                         }
+                                                                               
}
 
-                                                                  }
+                                                                       }
                                                                }
-                                                               
+
                                                        }
                                                }
                                        } else {
-                                               // New inventory contains a 
process not in the previous snapshot
+                                               // New inventory contains a 
process not in the previous
+                                               // snapshot
                                                inventoryChanged = true;
                                                break;
                                        }
                                }
                        }
                }
-               
-               //      Get this inventory snapshot
+
+               // Get this inventory snapshot
                previousInventory = inventory;
-               //      Broadcast inventory if there is a change or configured 
number of epochs
-               //  passed since the last broadcast. This is configured in 
ducc.properties with
-               //  property ducc.agent.node.inventory.publish.rate.skip
+               // Broadcast inventory if there is a change or configured 
number of
+               // epochs
+               // passed since the last broadcast. This is configured in
+               // ducc.properties with
+               // property ducc.agent.node.inventory.publish.rate.skip
                try {
-                       if ( inventory.size() > 0 && 
-                                       ( inventoryChanged || // if there is 
inventory change, publish
-                                         forceInventoryUpdateMaxThreshold == 0 
||  // skip rate in ducc.properties is zero, publish            
-                                         ( counter > 0 && (counter % 
forceInventoryUpdateMaxThreshold ) == 0)) )  { // if reached skip rate, publish
-                               
-                               outgoingMessage.getIn().setBody(new 
NodeInventoryUpdateDuccEvent(inventory));
-                               StringBuffer sb = new StringBuffer("Node 
Inventory ("+inventory.size()+")");
-                               for( Map.Entry<DuccId, IDuccProcess> p : 
inventory.entrySet()) {
+                       if (inventory.size() > 0 && (inventoryChanged || // if 
there is
+                                                                               
                                                // inventory
+                                                                               
                                                // change,
+                                                                               
                                                // publish
+                                       forceInventoryUpdateMaxThreshold == 0 
|| // skip rate in
+                                                                               
                                                // ducc.properties
+                                                                               
                                                // is zero,
+                                                                               
                                                // publish
+                                       (counter > 0 && (counter % 
forceInventoryUpdateMaxThreshold) == 0))) { // if
+                                                                               
                                                                                
                        // reached
+                                                                               
                                                                                
                        // skip
+                                                                               
                                                                                
                        // rate,
+                                                                               
                                                                                
                        // publish
+
+                               StringBuffer sb = new StringBuffer("Node 
Inventory ("
+                                               + inventory.size() + ")");
+                               for (Map.Entry<DuccId, IDuccProcess> p : 
inventory.entrySet()) {
                                        /*
-                                       long endInitLong = 0;
-                                       String endInit = "";
-                                       ITimeWindow wInit = 
p.getValue().getTimeWindowInit();
-                                       if(wInit != null) {
-                                               endInit = wInit.getEnd();
-                                               endInitLong = 
wInit.getEndLong();
-                                       }
-                                       long startRunLong = 0;
-                                       String startRun = "";
-                                       ITimeWindow wRun = 
p.getValue().getTimeWindowRun();
-                                       if(wRun != null) {
-                                               startRun = wRun.getStart();
-                                               startRunLong = 
wRun.getStartLong();
+                                        * long endInitLong = 0; String endInit 
= ""; ITimeWindow
+                                        * wInit = 
p.getValue().getTimeWindowInit(); if(wInit !=
+                                        * null) { endInit = wInit.getEnd(); 
endInitLong =
+                                        * wInit.getEndLong(); } long 
startRunLong = 0; String
+                                        * startRun = ""; ITimeWindow wRun =
+                                        * p.getValue().getTimeWindowRun(); 
if(wRun != null) {
+                                        * startRun = wRun.getStart(); 
startRunLong =
+                                        * wRun.getStartLong(); } 
if(endInitLong > startRunLong) {
+                                        * logger.warn(methodName, null,
+                                        * "endInit:"+endInitLong+" 
"+"startRun:"+startRunLong); }
+                                        */
+                                       int pipelineInitStats = (p.getValue()
+                                                       
.getUimaPipelineComponents() == null) ? 0 : p
+                                                       
.getValue().getUimaPipelineComponents().size();
+
+                                       if 
(p.getValue().getUimaPipelineComponents() == null) {
+                                               
p.getValue().setUimaPipelineComponents(
+                                                               new 
ArrayList<IUimaPipelineAEComponent>());
                                        }
-                                       if(endInitLong > startRunLong) {
-                                               logger.warn(methodName, null, 
"endInit:"+endInitLong+" "+"startRun:"+startRunLong);
+                                       sb.append("\n\t[Process Type=")
+                                                       
.append(p.getValue().getProcessType())
+                                                       .append(" DUCC ID=")
+                                                       
.append(p.getValue().getDuccId())
+                                                       .append(" PID=")
+                                                       
.append(p.getValue().getPID())
+                                                       .append(" State=")
+                                                       
.append(p.getValue().getProcessState())
+                                                       .append(" Resident 
Memory=")
+                                                       
.append(p.getValue().getResidentMemory())
+                                                       .append(" Init Stats 
List Size:"
+                                                                       + 
pipelineInitStats).
+                                                       // append(" end 
init:"+endInit).
+                                                       // append(" start 
run:"+startRun).
+                                                       append("] ");
+                                       if (p.getValue().getProcessState()
+                                                       
.equals(ProcessState.Stopped)
+                                                       || 
p.getValue().getProcessState()
+                                                                       
.equals(ProcessState.Failed)
+                                                       || 
p.getValue().getProcessState()
+                                                                       
.equals(ProcessState.Killed)) {
+                                               sb.append(" Reason:"
+                                                               + 
p.getValue().getReasonForStoppingProcess());
                                        }
-                                       */
-               int pipelineInitStats = 
(p.getValue().getUimaPipelineComponents() == null ) ? 0 : 
-                 p.getValue().getUimaPipelineComponents().size();
-
-               if ( p.getValue().getUimaPipelineComponents() == null) {
-                       p.getValue().setUimaPipelineComponents(new 
ArrayList<IUimaPipelineAEComponent>());
-               }
-                       sb.append("\n\t[Process Type=").
-                                          
append(p.getValue().getProcessType()).
-                                          append(" DUCC ID=").
-                                          append(p.getValue().getDuccId()).
-                                          append(" PID=").
-                                          append(p.getValue().getPID()).
-                                          append(" 
State=").append(p.getValue().getProcessState()).
-                                          append(" Resident 
Memory=").append(p.getValue().getResidentMemory()).
-                                          append(" Init Stats List 
Size:"+pipelineInitStats).
-                                          //append(" end init:"+endInit).
-                                          //append(" start run:"+startRun).
-                                          append("] ");
-          if ( p.getValue().getProcessState().equals(ProcessState.Stopped) ||
-                  p.getValue().getProcessState().equals(ProcessState.Failed) ||
-                  p.getValue().getProcessState().equals(ProcessState.Killed)) {
-              sb.append(" Reason:"+p.getValue().getReasonForStoppingProcess());
-             }
-          sb.append(" Exit Code="+p.getValue().getProcessExitCode());
+                                       sb.append(" Exit Code=" + 
p.getValue().getProcessExitCode());
                                }
-                               logger.info(methodName, null, "Agent 
"+agent.getIdentity().getName()+" Posting Inventory:"+sb.toString());
+                               logger.info(methodName, null, "Agent "
+                                               + agent.getIdentity().getName() 
+ " Posting Inventory:"
+                                               + sb.toString());
+                               outgoingMessage.getIn().setBody(new 
NodeInventoryUpdateDuccEvent(inventory));
+
                        } else {
-                         // Add null to the body of the message. A filter 
-                         // defined in the Camel route 
(AgentConfiguration.java)
-                         // has a predicate to check for null body and throws
-                         // away such a message.
-              outgoingMessage.getIn().setBody(null);
+                               // Add null to the body of the message. A filter
+                               // defined in the Camel route 
(AgentConfiguration.java)
+                               // has a predicate to check for null body and 
throws
+                               // away such a message.
+                               outgoingMessage.getIn().setBody(null);
                        }
-               } catch( Exception e) {
+               } catch (Exception e) {
                        logger.error(methodName, null, e);
                } finally {
-                       if ( inventoryChanged ) {
+                       if (inventoryChanged) {
                                counter = 0;
                        } else {
                                counter++;


Reply via email to