Hi,
currently I am playing around with smix 3.1.1 and got a strange problem.
I try to send a InOut Message from one bean component to another. The target
component then retrieves some data from the message, doing some task with the
data and then sending an answer back to the caller.
The receiving works fine, but when sending back the answer I got the following
error message:
ERROR - BeanComponent - Error processing exchange InOut[
id: ID:127.0.0.1-114f3bb1af9-6:1
status: Active
role: consumer
service: {http://www.compart.net/test}ConversionService
endpoint: endpoint
in: null
out: null
]
java.lang.IllegalStateException: Receiving unknown consumer exchange: InOut[
id: ID:127.0.0.1-114f3bb1af9-6:1
status: Active
role: consumer
service: {http://www.compart.net/test}ConversionService
endpoint: endpoint
in: null
out: null
]
at
org.apache.servicemix.bean.BeanEndpoint.onConsumerExchange(BeanEndpoint.java:268)
at
org.apache.servicemix.bean.BeanEndpoint.process(BeanEndpoint.java:198)
at
org.apache.servicemix.common.AsyncBaseLifeCycle.doProcess(AsyncBaseLifeCycle.java:489)
at
org.apache.servicemix.common.AsyncBaseLifeCycle.processExchange(AsyncBaseLifeCycle.java:463)
at
org.apache.servicemix.common.BaseLifeCycle.onMessageExchange(BaseLifeCycle.java:46)
at
org.apache.servicemix.jbi.messaging.DeliveryChannelImpl.processInBound(DeliveryChannelImpl.java:595)
at
org.apache.servicemix.jbi.nmr.flow.AbstractFlow.doRouting(AbstractFlow.java:174)
at
org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow.doRouting(SedaFlow.java:176)
at
org.apache.servicemix.jbi.nmr.flow.seda.SedaQueue$1.run(SedaQueue.java:134)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:665)
at
edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:690)
at java.lang.Thread.run(Thread.java:595)
For better analyzing I will post the logic for filling the answer:
public void onMessageExchange(MessageExchange exchange) throws
MessagingException
{
// The component acts as a provider, this means that another component
// has requested our service
// As this exchange is active, this is either an in or a fault
// (out are send by this component)
if (exchange.getRole() == MessageExchange.Role.PROVIDER)
{
// Check here if the mep is supported by this component
if (exchange instanceof InOut == false)
{
throw new UnsupportedOperationException("Unsupported MEP: " +
exchange.getPattern());
}
// Exchange is finished
if (exchange.getStatus() == ExchangeStatus.DONE)
{
return;
}
// Exchange has been aborted with an exception
else if (exchange.getStatus() == ExchangeStatus.ERROR)
{
return;
}
// Exchange is active
else
{
// In message
if (exchange.getMessage("in") != null)
{
// check if this is a InOut from the router
if (exchange instanceof InOut)
{
NormalizedMessage msg = exchange.getMessage("in");
String action =
msg.getProperty(PROPERTY_ACTION).toString();
if (action.equalsIgnoreCase(CONVERSION_TAG))
{
...
boolean converted = convertDocument(fileToUse,
newFileName, targetFormat);
if (converted)
{
// if all went well, delete source file and
quit
File sourceFile = new File(fileToUse);
sourceFile.delete();
NormalizedMessage response =
exchange.createMessage();
// save the new filename to header properties
response.setProperty(PROPERTY_TMP_FILE_PATH,
newFileName);
response.setProperty(PROPERTY_SMX_FILE_NAME,
originalName);
response.setProperty(PROPERTY_SMX_FILE_PATH,
originalPath);
response.setProperty(PROPERTY_TARGET_FORMAT,
targetFormat);
exchange.setMessage(response, "out");
exchange.setStatus(ExchangeStatus.ACTIVE);
channel.send(exchange);
System.out.println("Answer sent...");
}
else
{
// if an error occured, give it to caller
exchange.setStatus(ExchangeStatus.ERROR);
exchange.setError(new
IllegalArgumentException("The conversion failed!"));
channel.send(exchange);
}
}
else
{
exchange.setStatus(ExchangeStatus.ERROR);
exchange.setError(new
UnsupportedOperationException("The operation [" + action + "] is not
supported!"));
channel.send(exchange);
}
}
else
{
// response handling
System.out.println(exchange.toString());
throw new UnsupportedOperationException("Unsupported
MEP: " + exchange.getPattern());
}
}
// Fault message
else if (exchange.getFault() != null)
{
exchange.setStatus(ExchangeStatus.DONE);
channel.send(exchange);
}
// This is not compliant with the default MEPs
else
{
throw new IllegalStateException("Provider exchange is
ACTIVE, but no in or fault is provided");
}
}
}
// The component acts as a consumer, this means this exchange is
received because
// we sent it to another component. As it is active, this is either an
out or a fault
// If this component does not create / send exchanges, you may just
throw an
// UnsupportedOperationException
else if (exchange.getRole() == MessageExchange.Role.CONSUMER)
{
// Exchange is finished
if (exchange.getStatus() == ExchangeStatus.DONE)
{
return;
}
// Exchange has been aborted with an exception
else if (exchange.getStatus() == ExchangeStatus.ERROR)
{
return;
}
// Exchange is active
else
{
// Out message
if (exchange.getMessage("out") != null)
{
// response handling
System.out.println(exchange.toString());
throw new UnsupportedOperationException("Received out
message but this is not supported");
}
// Fault message
else if (exchange.getFault() != null)
{
// TODO ... handle the fault
exchange.setStatus(ExchangeStatus.DONE);
channel.send(exchange);
}
// This is not compliant with the default MEPs
else
{
throw new IllegalStateException("Consumer exchange is
ACTIVE, but no out or fault is provided");
}
}
}
// Unknown role
else
{
throw new IllegalStateException("Unkown role: " +
exchange.getRole());
}
}
This code is only for testing purposes, so don't care about ugly code :)
The question is now what is going wrong. Maybe I also have not understood
fully how to handle InOut messages.
Thanks for any help,
Lars