Hi,

currently I am playing around with smix 3.1.1 and got a strange problem.

I try to send a InOut Message from one bean component to another. The target  
component then retrieves some data from the message, doing some task with the 
data and then sending an answer back to the caller.

The receiving works fine, but when sending back the answer I got the following 
error message:

ERROR - BeanComponent                  - Error processing exchange InOut[
  id: ID:127.0.0.1-114f3bb1af9-6:1
  status: Active
  role: consumer
  service: {http://www.compart.net/test}ConversionService
  endpoint: endpoint
  in: null
  out: null
]
java.lang.IllegalStateException: Receiving unknown consumer exchange: InOut[
  id: ID:127.0.0.1-114f3bb1af9-6:1
  status: Active
  role: consumer
  service: {http://www.compart.net/test}ConversionService
  endpoint: endpoint
  in: null
  out: null
]
        at 
org.apache.servicemix.bean.BeanEndpoint.onConsumerExchange(BeanEndpoint.java:268)
        at 
org.apache.servicemix.bean.BeanEndpoint.process(BeanEndpoint.java:198)
        at 
org.apache.servicemix.common.AsyncBaseLifeCycle.doProcess(AsyncBaseLifeCycle.java:489)
        at 
org.apache.servicemix.common.AsyncBaseLifeCycle.processExchange(AsyncBaseLifeCycle.java:463)
        at 
org.apache.servicemix.common.BaseLifeCycle.onMessageExchange(BaseLifeCycle.java:46)
        at 
org.apache.servicemix.jbi.messaging.DeliveryChannelImpl.processInBound(DeliveryChannelImpl.java:595)
        at 
org.apache.servicemix.jbi.nmr.flow.AbstractFlow.doRouting(AbstractFlow.java:174)
        at 
org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow.doRouting(SedaFlow.java:176)
        at 
org.apache.servicemix.jbi.nmr.flow.seda.SedaQueue$1.run(SedaQueue.java:134)
        at 
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:665)
        at 
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:690)
        at java.lang.Thread.run(Thread.java:595)
                                                              


For better analyzing I will post the logic for filling the answer:


    public void onMessageExchange(MessageExchange exchange) throws 
MessagingException
        {
        // The component acts as a provider, this means that another component 
        // has requested our service
        // As this exchange is active, this is either an in or a fault 
        // (out are send by this component)
        if (exchange.getRole() == MessageExchange.Role.PROVIDER)
            {
            // Check here if the mep is supported by this component
            if (exchange instanceof InOut == false)
                {
                throw new UnsupportedOperationException("Unsupported MEP: " + 
exchange.getPattern());
                }
            
            // Exchange is finished
            if (exchange.getStatus() == ExchangeStatus.DONE)
                {
                return;
                }
            // Exchange has been aborted with an exception
            else if (exchange.getStatus() == ExchangeStatus.ERROR)
                {
                return;
                }
            // Exchange is active
            else
                {
                // In message
                if (exchange.getMessage("in") != null)
                    {
                    // check if this is a InOut from the router
                    if (exchange instanceof InOut)
                        {
                        NormalizedMessage msg = exchange.getMessage("in");
                        
                        String action = 
msg.getProperty(PROPERTY_ACTION).toString();
                        
                        if (action.equalsIgnoreCase(CONVERSION_TAG))
                            {

                            ...                            

                            boolean converted = convertDocument(fileToUse, 
newFileName, targetFormat);
                            
                            if (converted)
                                {
                                // if all went well, delete source file and 
quit
                                File sourceFile = new File(fileToUse);
                                sourceFile.delete();
                                
                                NormalizedMessage response = 
exchange.createMessage();
                                
                                // save the new filename to header properties
                                response.setProperty(PROPERTY_TMP_FILE_PATH, 
newFileName);
                                response.setProperty(PROPERTY_SMX_FILE_NAME, 
originalName);
                                response.setProperty(PROPERTY_SMX_FILE_PATH, 
originalPath);
                                response.setProperty(PROPERTY_TARGET_FORMAT, 
targetFormat);
                                exchange.setMessage(response, "out");
                                exchange.setStatus(ExchangeStatus.ACTIVE);
                                
                                channel.send(exchange);
                                System.out.println("Answer sent...");
                                }
                            else
                                {
                                // if an error occured, give it to caller
                                exchange.setStatus(ExchangeStatus.ERROR);
                                exchange.setError(new 
IllegalArgumentException("The conversion failed!"));
                                channel.send(exchange);
                                }
                            }
                        else
                            {
                            exchange.setStatus(ExchangeStatus.ERROR);
                            exchange.setError(new 
UnsupportedOperationException("The operation [" + action + "] is not 
supported!"));
                            channel.send(exchange);
                            }
                        }
                    else
                        {
                        // response handling
                        System.out.println(exchange.toString());

                        throw new UnsupportedOperationException("Unsupported 
MEP: " + exchange.getPattern());
                        }
                    }
                // Fault message
                else if (exchange.getFault() != null)
                    {
                    exchange.setStatus(ExchangeStatus.DONE);
                    channel.send(exchange);
                    }
                // This is not compliant with the default MEPs
                else
                    {
                    throw new IllegalStateException("Provider exchange is 
ACTIVE, but no in or fault is provided");
                    }
                }
            }
        // The component acts as a consumer, this means this exchange is 
received because
        // we sent it to another component. As it is active, this is either an 
out or a fault
        // If this component does not create / send exchanges, you may just 
throw an
        // UnsupportedOperationException
        else if (exchange.getRole() == MessageExchange.Role.CONSUMER)
            {
            // Exchange is finished
            if (exchange.getStatus() == ExchangeStatus.DONE)
                {
                return;
                }
            // Exchange has been aborted with an exception
            else if (exchange.getStatus() == ExchangeStatus.ERROR)
                {
                return;
                }
            // Exchange is active
            else
                {
                // Out message
                if (exchange.getMessage("out") != null)
                    {
                    // response handling
                    System.out.println(exchange.toString());

                    throw new UnsupportedOperationException("Received out 
message but this is not supported");
                    }
                // Fault message
                else if (exchange.getFault() != null)
                    {
                    // TODO ... handle the fault
                    exchange.setStatus(ExchangeStatus.DONE);
                    channel.send(exchange);
                    }
                // This is not compliant with the default MEPs
                else
                    {
                    throw new IllegalStateException("Consumer exchange is 
ACTIVE, but no out or fault is provided");
                    }
                }
            }
        // Unknown role
        else
            {
            throw new IllegalStateException("Unkown role: " + 
exchange.getRole());
            }
        }



This code is only for testing purposes, so don't care about ugly code :) 

The question is now what is going wrong. Maybe I also have not understood 
fully how to handle InOut messages.

Thanks for any help,
Lars

Reply via email to