I'm using the Scala DSL and I'm attempting to split the input from a file,
then aggregate it again once the processing is done.  The route building I
have is:

val context = new DefaultCamelContext
  context.addRoutes(new RouteBuilder {
    "file:perf?noop=true" ==> {
      aggregate(split(_.getIn().getBody(classOf[String]).split("\n")) {
        loadbalance roundrobin {
          to("direct:x").id("x")
          to("direct:y").id("y")
        }
      }, new MyAggregator) {
        to("file:perf_outbox?fileName=perf.csv")
      }
    }

    "direct:x" process (processor)
    "direct:y" process (processor)
  })

Where the processor's are:


val processor = (exchange: Exchange) => {
    val line = exchange.getIn.getBody(classOf[String])
    val lineSplit = line.split("\\[\\]\\: ")
    val split = lineSplit(1).split(" ")
    val x = split(0) + "," + split(1) + "," + split(2) + "," + split(3) +
"," + split(4) + "," + split(5) + "," + split(7) + "," + split(10)
    exchange.getIn().setBody(x)
  }

  class MyAggregator extends AggregationStrategy {
    def aggregate(oldExchange: Exchange, newExchange: Exchange): Exchange =
{
      if (oldExchange == null) {
        return newExchange
      }
      val body = newExchange.getIn().getBody(classOf[String])
      val existing = oldExchange.getIn().getBody(classOf[String])
      oldExchange.getIn().setBody(existing + "\n" + body)
      return oldExchange
    }
  }

I see that my aggregator gets call, it just doesn't seem to send it down the
line to the file like I would expect.

--
View this message in context: 
http://camel.465427.n5.nabble.com/Scala-DSL-for-Aggregate-tp5661896p5661896.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to