Luis Miguel created CAMEL-12620:
-----------------------------------

             Summary: CompletionAwareAggregationStrategy onCompletition method 
null exchange.
                 Key: CAMEL-12620
                 URL: https://issues.apache.org/jira/browse/CAMEL-12620
             Project: Camel
          Issue Type: Bug
            Reporter: Luis Miguel


Hello.
 
We have a camel project to process some Csv files, we created a class 
implementing "CompletionAwareAggregationStrategy" in order to aggregate each 
row processed and we override the methods "aggregate" and "onCompletion" from 
it.
The way we process the Csv file is parallelized with the "parallelProcessing" 
and indicating an "executorService" with 8 concurrently threads.
 
We are having a weird issue that rarely happens, and is that in the middle of 
the process of the file, the method "onCompletion" is being called (even when 
the file is not complete yet) and it sets the argument "Exchange" as NULL so 
the whole camel route is messed up.


As I said this rarely happens when we try to reprocess the file the error is 
gone.
 
 
 
Here's the main route that process the CSV file, notice that for the split 
process we create an instance of "CsvAggregationStrategy"
 
{code:java}
@Qualifier("executorServicePublicationItemCsv")
@Autowired
private ExecutorService executorService;

@Override
public void configure() throws Exception {
String[] header = Arrays.stream(PublicationItemCsvFields.values())
.map(PublicationItemCsvFields::getText).toArray(String[]::new);

Map<String, String> csvFieldsByEntityAttribute = new HashMap<>();

mapFields(csvFieldsByEntityAttribute);

//@formatter:off
from("file:publicationItemData?delete={{routes.push-csv-to-service.delete-source-file}}")
.streamCaching()
.routeId("push-publication-item-csv-to-service")
.onException(Exception.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error in publication item route, sending an email: 
${exception.message} ${exception.stacktrace}")
.to("direct:sendImportErrorReport")
.end()
.log(LoggingLevel.INFO, "Beginning to import publication item CSV: 
${file:onlyname}")
.unmarshal(new CsvDataFormat()
.setSkipHeaderRecord(true)
.setNullString(EMPTY)
.setLazyLoad(true))
.split(body(), new CsvAggregationStrategy())
.streaming()
.parallelProcessing().executorService(executorService)
.to("direct:publication-item-splitter")
.end()
.choice()
.when(simple("${exchangeProperty.aggregationError} != null"))
.log("An error occurred when aggregating exchanges, sending an email with the 
error.")
.setProperty("original_body", body())
.to("direct:sendAggregationErrorEmail")
.setBody(exchangeProperty("original_body")) 
.end()
.choice()
.when(simple("${exchangeProperty.badCsvData.size()} > 0"))
.setBody(simple("${exchangeProperty.badCsvData}"))
.marshal(new CsvDataFormat().setHeader(header))
.setProperty("badRowsBody").simple("${body}")
.end()
.choice()
.when(simple("${exchangeProperty.successfulRecords.size()} > 0"))
.setBody(simple("${exchangeProperty.successfulRecords}"))
.marshal(new CsvDataFormat().setHeader(header))
.setProperty("successfulRowsBody").simple("${body}")
.end() 
.to("direct:sendImportReport").end()
.log("Completed import for publication item CSV: '${file:onlyname}'");


from("direct:publication-item-splitter")
.streamCaching()
.routeId("push-publication-item-splitter")
.onException(PublicationItemImportException.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error importing publication item data: 
${exception.message} ${exception.stacktrace}")
.end()
.onException(HttpHostConnectException.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error connecting to publication item service host: 
${exception.host}. Request body: ${body}")
.end()
.onException(HttpOperationFailedException.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error received from publication item service: HTTP 
${exception.statusCode}. Response body: ${exception.responseBody}. Request 
body: ${body}")
.end()
.onException(Exception.class)
.handled(true)
.log(LoggingLevel.ERROR, "Error: ${exception.message} ${exception.stacktrace}")
.end()
.setProperty("csvRowData").simple("${body}", List.class)
.setProperty("csvFieldsByEntityAttribute").constant(csvFieldsByEntityAttribute)
.bean(publicationItemCSVDataHandler)
.marshal().json(JsonLibrary.Jackson)
.setHeader(HttpHeaders.AUTHORIZATION, simple("Basic 
"+propertyServiceAuthorization))
.log("Item ID: ${property.itemId}")
.choice()
.when().simple("${property.itemId} != null")
.setHeader(Exchange.HTTP_PATH, simple("${property.itemId}"))
.to("rest:PUT:items?host={{backend.event-service.host}}")
.otherwise()
.to("rest:POST:items?host={{backend.event-service.host}}")
.end()
.setProperty("responseId").jsonpath("$.id", true)
.setProperty("idColumnPosition").constant(PublicationItemCsvFields.ID.getNumber())
.choice()
.when(exchangeProperty("responseId").isNull())
.throwException(PublicationItemImportException.class, "Unexpected rest "
+ "response (no id returned)")
.otherwise()
.end();


//@formatter:on

}

{code}
 
And this is the CsvAggregationStrategy, we're getting a null pointer Exception 
in the onCompletition method due to a null exchange.
 
{code:java}
@Slf4j
public class CsvAggregationStrategy implements 
CompletionAwareAggregationStrategy {

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
try {
if (oldExchange == null) {
oldExchange = newExchange;
oldExchange.setProperty("badCsvData", new TreeMap<>()); // TreeMap ensures a 
sorted order
oldExchange.setProperty("successfulRecords", new TreeMap<>());
}

if (newExchange.getProperty(Exchange.SPLIT_COMPLETE, boolean.class)) {
oldExchange.setProperty("numberOfCSVRows", 
newExchange.getProperty(Exchange.SPLIT_SIZE));
}

Exception exception = newExchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
Exception.class);
if (exception != null) {
@SuppressWarnings("unchecked")
Map<Integer, Object> badCsvData = oldExchange.getProperty("badCsvData", 
Map.class);

@SuppressWarnings("unchecked")
List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);

if (exception instanceof HttpOperationFailedException) {
Map<String, String> csvFieldsByEntityAttribute =
newExchange.getProperty("csvFieldsByEntityAttribute", Map.class);
String responseBody = ((HttpOperationFailedException) 
exception).getResponseBody();

String errorMessage = getErrorMessage(responseBody, csvFieldsByEntityAttribute,
((HttpOperationFailedException) exception).getStatusCode());

csvRowData.add(errorMessage);
} else {
csvRowData.add(exception.getMessage());
}

badCsvData.put(newExchange.getProperty(Exchange.SPLIT_INDEX, Integer.class), 
csvRowData);
oldExchange.setProperty("badCsvData", badCsvData);
}else {
@SuppressWarnings("unchecked")
Map<Integer, Object> sucessRecords = 
oldExchange.getProperty("successfulRecords", Map.class);

@SuppressWarnings("unchecked")
List<String> csvRowData = newExchange.getProperty("csvRowData", List.class);

Integer idPosition = (Integer) newExchange.getProperty("idColumnPosition");
if(idPosition != null) {
csvRowData.set(idPosition, (String)newExchange.getProperty("responseId"));
}else {
csvRowData.add((String)newExchange.getProperty("responseId"));
}
sucessRecords.put(newExchange.getProperty(Exchange.SPLIT_INDEX, Integer.class), 
csvRowData);
oldExchange.setProperty("successfulRecords", sucessRecords);
}
} catch(Exception e) {
log.error("Error when trying to aggregate exchanges: " + e.getMessage(), e);
if(oldExchange != null) {
oldExchange.setProperty("aggregationError", ExceptionUtils.getStackTrace(e));
}
}

return oldExchange;
}

@Override
public void onCompletion(Exchange exchange) {
@SuppressWarnings("unchecked")
Map<Integer, List<String>> badCsvData = exchange.getProperty("badCsvData", 
Map.class);
exchange.setProperty("badCsvData", new ArrayList<>(badCsvData.values()));

@SuppressWarnings("unchecked")
Map<Integer, List<String>> succesfulCsvData = 
exchange.getProperty("successfulRecords", Map.class);
exchange.setProperty("successfulRecords", new 
ArrayList<>(succesfulCsvData.values()));

/* Removing Exception/Failure properties if any occurred while processing the 
CSV rows. */
exchange.removeProperties("CamelFailure*");
exchange.removeProperties("CamelException*");
exchange.removeProperties("CamelError*");

}
}
{code}
 
 
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to