Based on the documentation I was reading the ProducerTemplate.request(<URL>, Processor) should return the Exchange information from the route that was called. When I am running my test on this it is returning the body information in the exchange.getOut().getBody() but the exchange.getOut().getHeaders() has the headers that I sent and not returned headers that I see if I look at the mock endpoint. I am expecting it to return the header and body data in the Exchange.getIn(). Since this test is calling a jms message Queue (JBoss EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as the a call from another route would see. Am I understanding this wrong or am I missing a setting?
public void testConnection() throws Exception { log.info("----->> Begin testConnection <<-----"); Assert.assertNotNull("Expected QueueRoute-context to exist", camelctx); Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus()); MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult", MockEndpoint.class); assertNotNull("Expected mock:JobLogResult to exist", mock); mock.reset(); mock.expectedMessageCount(1); ProducerTemplate producerTemplate = camelctx.createProducerTemplate(); Assert.assertNotNull("Expected producerTemplate to exist", producerTemplate); Exchange reply = producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new Processor() { public void process(final Exchange exchange) { Map<String, Object> headers = new HashMap<String, Object>(); headers.put("RequestingSource", requestingSource); headers.put("AuthorizationKey", authorizationKey); headers.put("RequestType", "testConnection"); Map<String, Object> payload = new HashMap<String, Object>(); exchange.getIn().setHeaders(headers); exchange.getIn().setBody(payload); exchange.setPattern(ExchangePattern.InOut); } }); // these fail assertThat("Request was un-successful", ( reply.getIn().getHeader("SQLErrorMessage") == null ? "null" :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase() ,equalTo("success")); assertThat("Didn't Return Data ", (int) reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1)); // these are successfull. List<Exchange> list = mock.getReceivedExchanges(); Exchange reply2 = list.get(0); assertThat("Request was un-successful", ( reply2.getIn().getHeader("SQLErrorMessage") == null ? "null" :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase() ,equalTo("success")); assertThat("Didn't Return Data ", (int) reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1)); } Route from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut") .routeId("JobLogRouter") .log(LoggingLevel.INFO,"*************************************************") .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest") .log(LoggingLevel.INFO,"* Requesting Source: ${header.RequestingSource} " ) .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType} " ) .log(LoggingLevel.INFO,"*************************************************") .choice() .when(simple ("${header.RequestType} == \"complete\"")) .to("direct:CompleteJob") .endChoice() .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}") .to("direct:LogAudit") .endChoice() .when(simple ("${header.RequestType} == \"error\"")) .to("direct:LogError") .endChoice() .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}") .to("direct:WhoAmI") .endChoice() .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnection\")}") .to("direct:ConnectionTest") .endChoice() .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")} || ${header.RequestType?.equalsIgnoreCase(\"open\")}") .to("direct:InitialJobRequest") .endChoice() .otherwise() .process(new Processor() { public void process(Exchange exchange) throws Exception { // Message in = exchange.getIn(); Map<String, Object> payload = new HashMap<String, Object>(); exchange.setOut(exchange.getIn()); payload = (Map<String,Object>) exchange.getIn().getBody(); payload.put("whoami", "I am " + getContext().resolvePropertyPlaceholders("{{whoami}}") ); payload.put("SQLErrorMessage", "No Choice Provided"); payload.put("SQLErrorNumber", -1); exchange.getOut().setBody(payload); exchange.getOut().setHeader("RequestType", "failed"); } }) .endChoice() .end() .process(new Processor() { public void process(Exchange exchange) throws Exception { exchange.setOut(exchange.getIn()); exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().getClass().getName()); exchange.getOut().setHeader("BodyEncoding", "JSON"); String payloadJson = new ObjectMapper().writeValueAsString(exchange.getIn().getBody()); log.debug(payloadJson); exchange.getOut().setBody(payloadJson); } }) .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n Header: ${headers} \r\n Body: ${body} ") .to("mock:JobLogResult") .onException(Exception.class) .setHeader("MESSAGE_INFO", constant("Failure to Process Audit Job")) .setHeader("ROUTE_STOP", constant(Boolean.TRUE)) .log(LoggingLevel.ERROR, "${exception.stackTrace}") .maximumRedeliveries("0") .redeliveryDelay("0") .id("_onExceptionCron") .handled(true) ; ; from("direct:ConnectionTest") .routeId("ConnectionTest") .log(LoggingLevel.INFO, "Beginning Connection Test") .log(LoggingLevel.INFO,"whoami ->> {{whoami}}") .log(LoggingLevel.INFO, "Request: \r\n Header: ${headers} \r\n Body: ${body} ") .to("sql:select name from sys.sysusers where uid = 0?dataSource=#Sahara_DS") .log(LoggingLevel.INFO, "Request Result: \r\n Header: ${headers} \r\n Body: ${body} ") .process(new Processor() { public void process(Exchange exchange) throws Exception { Map<String, Object> payload = new HashMap<String, Object>(); exchange.setOut(exchange.getIn()); ArrayList<Map<String,Object>> body = (ArrayList<Map<String,Object>>) exchange.getIn().getBody(); payload.put("whoami", "I am " + getContext().resolvePropertyPlaceholders("{{whoami}}") ); body.add(payload); exchange.getOut().setBody(body); exchange.getOut().setHeader("RequestType", "complete"); exchange.getOut().setHeader("SQLErrorMessage", "Success"); } }) .onException(SQLException.class) .setHeader("MESSAGE_INFO", constant("Sql Connection Error")) .setHeader("ROUTE_STOP", constant(Boolean.TRUE)) .log(LoggingLevel.ERROR, "${exception.stackTrace}") .maximumRedeliveries("0") .redeliveryDelay("0") .id("_onExceptionConnectionTest") .handled(false) ; Regards- Marci Wilken Operations Architect Office of Information Services OHA/DHS/CAF-CW/OR-KIDS 503.378.2405 [Description: Description: Description: cid:image001.png@01CCF13E.2FACEC70][OHAsiglogo] CONFIDENTIALITY NOTICE This email may contain information that is privileged, confidential, or otherwise exempt from disclosure under applicable law. If you are not the addressee or it appears from the context or otherwise that you have received this email in error, please advise me immediately by reply email, keep the contents confidential, and immediately delete the message and any attachments from your system.