Hi, I am reading records from a CSV file and doing a batch insert into DB. Here's the sample code:
public class CliaRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { final DataFormat bindyObj = new BindyCsvDataFormat(Clia.class); final String datasource_name = "clia"; onException(CannotGetJdbcConnectionException.class) .maximumRedeliveries(3) .redeliveryDelay(2000) .useExponentialBackOff(); from("file:camel/input/"+datasource_name+"/?noop=true") .routeId("fileMessageFrom"+datasource_name+"Folder") .split(body().tokenize("\n")) .streaming() .choice() .when(body().contains("CITY_NAME")) .log("Ignoring message because a header row is detected - "+body()) .to("direct:trash") .otherwise() .to("direct:individual"+datasource_name+"Record"); from("direct:individual"+datasource_name+"Record") .routeId("individual"+datasource_name+"RowRecord") .process(new UniqueHashGenerator()) .idempotentConsumer(header("msgHash"), getIdempotentRepository(datasource_name)) .unmarshal(bindyObj) .aggregate(constant(true), new CliaAggregator()) .completionSize(50) .completionTimeout(2000) .aggregationRepository(getAggregationRepository()) .to("sql:insert into clia(prvdr_ctgry_sbtyp_cd, prvdr_ctgry_cd) values (:#category_subtype_code, :#category_code)?batch=true") .end(); } private static AggregationRepository getAggregationRepository() { SingleConnectionDataSource ds = new SingleConnectionDataSource(DB_URL, DB_USER, DB_PASS, true); DataSourceTransactionManager txManager = new DataSourceTransactionManager(ds); JdbcAggregationRepository repo = new JdbcAggregationRepository(txManager, "aggregation", ds); repo.setUseRecovery(true); repo.setMaximumRedeliveries(3); repo.setDeadLetterUri("direct:trash"); repo.setRecoveryInterval(2000); return (AggregationRepository) repo; } private static IdempotentRepository getIdempotentRepository(String name) { Map repoMap = new HashMap<String, Object>(); File fileStore = new File("camel/idempotent-repos/" + name + ".log"); return new FileIdempotentRepository(fileStore, repoMap); } } When I am aggregating multiple rows from a CSV file, it is possible that for a few records I might get DataIntegrityViolationException because of some DB constraints. There is no use in retrying those records since it is a data issue and not network / connection failure. Since the exception is being set at Exchange level, is there a way to identify the erroneous record from the aggregated message and process the remaining rows ? Or any other strategy which could be helpful to mitigate this type of issue ? Regards, Bhavesh Furia