Hello all, I have to ensure that the thread that runs a route (and the error handler) is guaranteed to see an exception set on the exchange instance if that instance is shared by many threads and the exception is set by one of those threads.
I have a route step that massages the data in an input stream (jetty endpoint) before proxying to the actual web server. For the stream data manipulation I use stream pipelines (PipedInputStream/PipedOutputStream) which need to run in their own thread per pipeline element. Each pipeline element holds a reference to the exchange and if an error is encountered the exception is set on the exchange. This appears to work alright. But I don't think it is guaranteed to work as "exception" is not a volatile member of DefaultExchange. Thus the main thread is not guaranteed to see an exception set on the exchange by a worker thread. The worker threads themselves all use a ReentrantLock I provide as an exchange property (the property map is a ConcurrentHashMap) to synchronize their access to the exchange. So data visibility between the worker threads should not be an issue. But how to do it for the main thread? I had a look at the Splitter implementation and how it deals with parallel execution and aggregation. As far as I understand it the Splitter enforces visibility by returning an AtomicReference of an exchange that the main thread has never seen before. The exchange referenced by the atomic reference is treated as read-only after it is set on the atomic reference. So a get() on the atomic reference in the main thread is guaranteed to see the latest version of the referenced exchange instance. I cannot apply this approach as it requires the main thread to wait for the workers to finish processing. If I block the main thread my workers would be blocked as well as the main thread is responsible for consuming the modified input stream contents from the last PipedInputStream element and forwarding it to the web server. I also could not find a factory mechanism that would allow me to tell Camel to instantiate my own implementation of the Exchange interface (with a volatile member "exception"). (DefaultException was also not written with sub-classing in mind, it appears. E.g. DefaultException::isFailed() uses the private member "exception" instead of DefaultException::getException() for the answer. But that's a separate issue.) Does anyone have any other ideas? Thanks! Ralf -- View this message in context: http://camel.465427.n5.nabble.com/Concurrent-modification-of-DefaultExchange-tp5793968.html Sent from the Camel - Users mailing list archive at Nabble.com.