Hello all,

I have to ensure that the thread that runs a route (and the error handler)
is guaranteed to see an exception set on the exchange instance if that
instance is shared by many threads and the exception is set by one of those
threads.

I have a route step that massages the data in an input stream (jetty
endpoint) before proxying to the actual web server. For the stream data
manipulation I use stream pipelines (PipedInputStream/PipedOutputStream)
which need to run in their own thread per pipeline element. Each pipeline
element holds a reference to the exchange and if an error is encountered the
exception is set on the exchange.
This appears to work alright. But I don't think it is guaranteed to work as
"exception" is not a volatile member of DefaultExchange. Thus the main
thread is not guaranteed to see an exception set on the exchange by a worker
thread. The worker threads themselves all use a ReentrantLock I provide as
an exchange property (the property map is a ConcurrentHashMap) to
synchronize their access to the exchange. So data visibility between the
worker threads should not be an issue. 
But how to do it for the main thread?

I had a look at the Splitter implementation and how it deals with parallel
execution and aggregation. As far as I understand it the Splitter enforces
visibility by returning an AtomicReference of an exchange that the main
thread has never seen before. The exchange referenced by the atomic
reference is treated as read-only after it is set on the atomic reference.
So a get() on the atomic reference in the main thread is guaranteed to see
the latest version of the referenced exchange instance.

I cannot apply this approach as it requires the main thread to wait for the
workers to finish processing. If I block the main thread my workers would be
blocked as well as the main thread is responsible for consuming the modified
input stream contents from the last PipedInputStream element and forwarding
it to the web server. 

I also could not find a factory mechanism that would allow me to tell Camel
to instantiate my own implementation of the Exchange interface (with a
volatile member "exception").
(DefaultException was also not written with sub-classing in mind, it
appears. E.g. DefaultException::isFailed() uses the private member
"exception" instead of DefaultException::getException() for the answer. But
that's a separate issue.)

Does anyone have any other ideas?



Thanks!
Ralf



--
View this message in context: 
http://camel.465427.n5.nabble.com/Concurrent-modification-of-DefaultExchange-tp5793968.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to