I understand that the new Processor(){} is for the message being sent to the consumers (from(route)) and this is working fine. What I am not understanding is why the ProducerTemplate.request() is not returning an exchange back at the end of the route. For example. Exchange reply = producerTemplate.request("jms:Sahara", new Processor() { public void process(final Exchange exchange) { exchange.getIn().setHeader(“test”,”foo”) } Route From(jms:Sahara”) .log(LoggingLevel.INFO,"* Test ID: ${header.test}" ) .setHeader(“test”,”bar”); end route
assertThat(“return should be bar”,reply.getIn().getHeader(“test”),equalTo(“bar”) but it is still “foo” This is failing when it should pass Maybe I’m reading these wrong >From Javadoc.io Exchange request(Endpoint endpoint, Processor processor) Sends an exchange to an endpoint using a supplied processor Uses an ExchangePattern.InOut message exchange pattern. from the camel apache org website request*() methods The send*() methods use the default Message Exchange Pattern (InOnly, InOut etc) as the endpoint. If you want to explicitly perform a request/response (InOut) you can use the request*() methods instead of the send*() methods. E.g. let’s invoke an endpoint and get the response: >Camel › Camel - Users › Camel - Users >kilidarria >Re: ProducerTemplate.request() not returning complete Exchange. >‹ Previous Topic Next Topic › classic Classic list List threaded Threaded X Turn off highlighting 1 message Options Options Classic Threaded Reply More Close Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re: ProducerTemplate.request() not returning complete Exchange. Hi The new Processor(){ } in the request method of the producer template is not for the reply message, but for preparing the message to send (eg the input message). On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J < [hidden email]> wrote: > Based on the documentation I was reading the > ProducerTemplate.request(<URL>, Processor) should return the Exchange > information from the route that was called. When I am running my test on > this it is returning the body information in the > exchange.getOut().getBody() but the exchange.getOut().getHeaders() has the > headers that I sent and not returned headers that I see if I look at the > mock endpoint. I am expecting it to return the header and body data in the > Exchange.getIn(). Since this test is calling a jms message Queue (JBoss > EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as the a > call from another route would see. Am I understanding this wrong or am I > missing a setting? > > > > *public void testConnection() throws Exception {* > > * log.info <http://log.info>("----->> Begin testConnection > <<-----");* > > * Assert.assertNotNull("Expected QueueRoute-context to exist", > camelctx);* > > * Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus());* > > > > * MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult", > MockEndpoint.class);* > > * assertNotNull("Expected mock:JobLogResult to exist", mock);* > > * mock.reset();* > > * mock.expectedMessageCount(1);* > > > > * ProducerTemplate producerTemplate = > camelctx.createProducerTemplate();* > > * Assert.assertNotNull("Expected producerTemplate to exist", > producerTemplate);* > > > > * Exchange reply = > producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new > Processor() {* > > > > * public void process(final Exchange exchange) {* > > > > * Map<String, Object> headers = new HashMap<String, > Object>();* > > * headers.put("RequestingSource", requestingSource);* > > * headers.put("AuthorizationKey", authorizationKey);* > > * headers.put("RequestType", "testConnection");* > > > > * Map<String, Object> payload = new HashMap<String, > Object>();* > > > > * exchange.getIn().setHeaders(headers);* > > * exchange.getIn().setBody(payload);* > > * exchange.setPattern(ExchangePattern.InOut);* > > > > * }* > > * });* > > * // these fail* > > * assertThat("Request was un-successful", ( > reply.getIn().getHeader("SQLErrorMessage") == null ? "null" > :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase() > ,equalTo("success"));* > > * assertThat("Didn't Return Data ", (int) > reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));* > > > > > > * // these are successfull.* > > * List<Exchange> list = mock.getReceivedExchanges();* > > * Exchange reply2 = list.get(0);* > > * assertThat("Request was un-successful", ( > reply2.getIn().getHeader("SQLErrorMessage") == null ? "null" > :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase() > ,equalTo("success"));* > > * assertThat("Didn't Return Data ", (int) > reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));* > > *}* > > > > *Route* > > *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")* > > * .routeId("JobLogRouter")* > > * > .log(LoggingLevel.INFO,"*************************************************") > * > > * .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")* > > * .log(LoggingLevel.INFO,"* Requesting Source: > ${header.RequestingSource} " )* > > * .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType} " )* > > * > .log(LoggingLevel.INFO,"*************************************************") > * > > * .choice() * > > * .when(simple ("${header.RequestType} == \"complete\""))* > > * .to("direct:CompleteJob")* > > * .endChoice()* > > * > .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}")* > > * .to("direct:LogAudit")* > > * .endChoice()* > > * .when(simple ("${header.RequestType} == \"error\""))* > > * .to("direct:LogError")* > > * .endChoice()* > > * > .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}")* > > * .to("direct:WhoAmI")* > > * .endChoice()* > > * > .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnection\")}")* > > * .to("direct:ConnectionTest")* > > * .endChoice()* > > * > .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")} || > ${header.RequestType?.equalsIgnoreCase(\"open\")}")* > > * .to("direct:InitialJobRequest")* > > * .endChoice() * > > * .otherwise()* > > * .process(new Processor() {* > > * public void process(Exchange exchange) throws > Exception {* > > * // Message in = exchange.getIn();* > > * Map<String, Object> payload = new > HashMap<String, Object>();* > > * exchange.setOut(exchange.getIn());* > > * payload = (Map<String,Object>) > exchange.getIn().getBody();* > > * payload.put("whoami", "I am " + > getContext().resolvePropertyPlaceholders("{{whoami}}") );* > > * payload.put("SQLErrorMessage", "No > Choice Provided");* > > * payload.put("SQLErrorNumber", -1);* > > * exchange.getOut().setBody(payload);* > > * > exchange.getOut().setHeader("RequestType", "failed");* > > > > * }* > > > > * })* > > * .endChoice()* > > * .end()* > > * .process(new Processor() {* > > * public void process(Exchange exchange) throws Exception {* > > > > * exchange.setOut(exchange.getIn());* > > * > exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().getClass().getName());* > > * exchange.getOut().setHeader("BodyEncoding", "JSON");* > > * String payloadJson = new > ObjectMapper().writeValueAsString(exchange.getIn().getBody());* > > * log.debug(payloadJson);* > > * exchange.getOut().setBody(payloadJson);* > > > > * }* > > > > * })* > > * .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n > Header: ${headers} \r\n Body: ${body} ")* > > > > * .to("mock:JobLogResult")* > > * .onException(Exception.class)* > > * .setHeader("MESSAGE_INFO", constant("Failure to Process > Audit Job"))* > > * .setHeader("ROUTE_STOP", constant(Boolean.TRUE))* > > * .log(LoggingLevel.ERROR, "${exception.stackTrace}")* > > * .maximumRedeliveries("0")* > > * .redeliveryDelay("0")* > > * .id("_onExceptionCron")* > > * .handled(true)* > > * ;* > > * ;* > > > > > > * from("direct:ConnectionTest")* > > * .routeId("ConnectionTest")* > > * .log(LoggingLevel.INFO, "Beginning Connection Test")* > > * .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")* > > * .log(LoggingLevel.INFO, "Request: \r\n Header: ${headers} \r\n > Body: ${body} ")* > > * .to("sql:select name from sys.sysusers where uid = > 0?dataSource=#Sahara_DS")* > > * .log(LoggingLevel.INFO, "Request Result: \r\n Header: ${headers} > \r\n Body: ${body} ")* > > > > * .process(new Processor() {* > > * public void process(Exchange exchange) throws Exception {* > > * Map<String, Object> payload = new HashMap<String, > Object>();* > > * exchange.setOut(exchange.getIn());* > > * ArrayList<Map<String,Object>> body = > (ArrayList<Map<String,Object>>) exchange.getIn().getBody();* > > * payload.put("whoami", "I am " + > getContext().resolvePropertyPlaceholders("{{whoami}}") );* > > * body.add(payload);* > > * exchange.getOut().setBody(body);* > > * exchange.getOut().setHeader("RequestType", "complete");* > > * exchange.getOut().setHeader("SQLErrorMessage", "Success");* > > * }* > > > > * })* > > * .onException(SQLException.class)* > > * .setHeader("MESSAGE_INFO", constant("Sql Connection > Error"))* > > * .setHeader("ROUTE_STOP", constant(Boolean.TRUE))* > > * .log(LoggingLevel.ERROR, "${exception.stackTrace}")* > > * .maximumRedeliveries("0")* > > * .redeliveryDelay("0")* > > * .id("_onExceptionConnectionTest")* > > * .handled(false)* > > * ;* > > > > > > > > > > Regards- > > Marci Wilken > > Operations Architect > > Office of Information Services > > OHA/DHS/CAF-CW/OR-KIDS > > 503.378.2405 > > [image: Description: Description: Description: > cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo] > > CONFIDENTIALITY NOTICE > > *This email may contain information that is privileged, confidential, or > otherwise exempt from disclosure under applicable law. If you are not the > addressee or it appears from the context or otherwise that you have > received this email in error, please advise me immediately by reply email, > keep the contents confidential, and immediately delete the message and any > attachments from your system. * > > > > > « [hide part of quote] -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2 « Return to Camel - Users | 12 views Free forum by Nabble Disable Popup Ads | Edit this page