Repository: incubator-unomi Updated Branches: refs/heads/master 6203e9947 -> b55a20b72
UNOMI-101 : Java style, improvement of error handler Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/b55a20b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/b55a20b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/b55a20b7 Branch: refs/heads/master Commit: b55a20b72e2c389da9a0cbdefd36563bc3258539 Parents: 6203e99 Author: Abdelkader Midani <amid...@apache.org> Authored: Tue Jun 20 13:41:05 2017 +0200 Committer: Abdelkader Midani <amid...@apache.org> Committed: Tue Jun 20 13:41:05 2017 +0200 ---------------------------------------------------------------------- .../core/context/ProfileImportCamelContext.java | 1 - .../BadProfileDataFormatException.java | 2 +- .../core/processor/ConfigUpdateProcessor.java | 2 +- .../ImportConfigByFileNameProcessor.java | 2 +- .../core/processor/LineSplitFailureHandler.java | 4 +-- .../core/processor/LineSplitProcessor.java | 24 +++++++++++------- .../processor/RouteCompletionProcessor.java | 26 ++++++++++---------- .../ProfileImportAbstractRouteBuilder.java | 8 +++--- .../ProfileImportFromSourceRouteBuilder.java | 3 +-- .../route/ProfileImportOneShotRouteBuilder.java | 18 +++++++++++--- .../route/ProfileImportToUnomiRouteBuilder.java | 1 - 11 files changed, 52 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java index f54beb1..9bb50f8 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java @@ -36,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java index 6c947ff..50acbe4 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java @@ -19,7 +19,7 @@ package org.apache.unomi.router.core.exception; /** * Created by amidani on 13/06/2017. */ -public class BadProfileDataFormatException extends Exception{ +public class BadProfileDataFormatException extends Exception { public BadProfileDataFormatException() { super(); http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java index e4eaa19..d8e4b9e 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java @@ -25,7 +25,7 @@ import org.apache.unomi.router.core.context.ProfileImportCamelContext; /** * Created by amidani on 10/05/2017. */ -public class ConfigUpdateProcessor implements Processor{ +public class ConfigUpdateProcessor implements Processor { private ProfileImportCamelContext profileImportCamelContext; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java index c2968cd..a910a1d 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java @@ -26,7 +26,7 @@ import org.apache.unomi.router.core.RouterConstants; /** * Created by amidani on 22/05/2017. */ -public class ImportConfigByFileNameProcessor implements Processor{ +public class ImportConfigByFileNameProcessor implements Processor { private ImportConfigurationService importConfigurationService; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java index 8f14f67..bfb92fb 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java @@ -34,9 +34,9 @@ public class LineSplitFailureHandler implements Processor { public void process(Exchange exchange) throws Exception { logger.debug("Route: {}, Error: {}", exchange.getProperty(Exchange.FAILURE_ROUTE_ID), exchange.getProperty(Exchange.EXCEPTION_CAUGHT)); ImportLineError importLineError = new ImportLineError(); - importLineError.setErrorCode(((BadProfileDataFormatException)exchange.getProperty(Exchange.EXCEPTION_CAUGHT)).getCause().getMessage()); + importLineError.setErrorCode(((BadProfileDataFormatException) exchange.getProperty(Exchange.EXCEPTION_CAUGHT)).getCause().getMessage()); importLineError.setLineContent(exchange.getIn().getBody(String.class)); - importLineError.setLineNb(((Integer)exchange.getProperty("CamelSplitIndex")+1)); + importLineError.setLineNb(((Integer) exchange.getProperty("CamelSplitIndex") + 1)); exchange.getIn().setHeader(RouterConstants.HEADER_FAILED_MESSAGE, new Boolean(true)); exchange.getIn().setBody(importLineError, ImportLineError.class); } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java index a03c833..afff204 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -25,7 +25,10 @@ import org.apache.unomi.router.api.ProfileToImport; import org.apache.unomi.router.core.RouterConstants; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; /** * Created by amidani on 29/12/2016. @@ -43,21 +46,21 @@ public class LineSplitProcessor implements Processor { //In case of one shot import we check the header and overwrite import config ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT); String configType = (String) exchange.getIn().getHeader(RouterConstants.HEADER_CONFIG_TYPE); - if(importConfigOneShot!=null) { - fieldsMapping = (Map<String, Integer>)importConfigOneShot.getProperties().get("mapping"); + if (importConfigOneShot != null) { + fieldsMapping = (Map<String, Integer>) importConfigOneShot.getProperties().get("mapping"); propertiesToOverwrite = importConfigOneShot.getPropertiesToOverwrite(); mergingProperty = importConfigOneShot.getMergingProperty(); overwriteExistingProfiles = importConfigOneShot.isOverwriteExistingProfiles(); columnSeparator = importConfigOneShot.getColumnSeparator(); } - String[] profileData = ((String)exchange.getIn().getBody()).split(columnSeparator, -1); + String[] profileData = ((String) exchange.getIn().getBody()).split(columnSeparator, -1); ProfileToImport profileToImport = new ProfileToImport(); profileToImport.setItemId(UUID.randomUUID().toString()); profileToImport.setItemType("profile"); profileToImport.setScope("system"); - if(profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) { - if(fieldsMapping.size() != (profileData.length - 1)) { - throw new BadProfileDataFormatException("The mapping does not match the number of column : line ["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new Throwable("MAPPING_COLUMN_MATCH")); + if (profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) { + if (fieldsMapping.size() != (profileData.length - 1)) { + throw new BadProfileDataFormatException("The mapping does not match the number of column : line [" + ((Integer) exchange.getProperty("CamelSplitIndex") + 1) + "]", new Throwable("MAPPING_COLUMN_MATCH")); } Map<String, Object> properties = new HashMap<>(); for (String fieldMappingKey : fieldsMapping.keySet()) { @@ -73,10 +76,10 @@ public class LineSplitProcessor implements Processor { profileToImport.setProfileToDelete(true); } } else { - throw new BadProfileDataFormatException("Empty line : line ["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new Throwable("EMPTY_LINE")); + throw new BadProfileDataFormatException("Empty line : line [" + ((Integer) exchange.getProperty("CamelSplitIndex") + 1) + "]", new Throwable("EMPTY_LINE")); } exchange.getIn().setBody(profileToImport, ProfileToImport.class); - if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); exchange.getIn().setHeader(KafkaConstants.KEY, "1"); } @@ -84,6 +87,7 @@ public class LineSplitProcessor implements Processor { /** * Setter of fieldsMapping + * * @param fieldsMapping map String,Integer fieldName in ES and the matching column index in the import file */ public void setFieldsMapping(Map<String, Integer> fieldsMapping) { @@ -104,6 +108,7 @@ public class LineSplitProcessor implements Processor { /** * Sets the merging property. + * * @param mergingProperty property used to check if the profile exist when merging */ public void setMergingProperty(String mergingProperty) { @@ -112,6 +117,7 @@ public class LineSplitProcessor implements Processor { /** * Sets the line separator. + * * @param columnSeparator property used to specify a line separator. Defaults to ',' */ public void setColumnSeparator(String columnSeparator) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java index 5952764..36ec319 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java @@ -41,7 +41,7 @@ public class RouteCompletionProcessor implements Processor { public void process(Exchange exchange) throws Exception { String importConfigId = null; ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT); - if(importConfigOneShot!=null) { + if (importConfigOneShot != null) { importConfigId = importConfigOneShot.getItemId(); } else { importConfigId = exchange.getFromRouteId(); @@ -52,10 +52,10 @@ public class RouteCompletionProcessor implements Processor { long ignoreCount = 0; List<ImportLineError> errors = new ArrayList<ImportLineError>(); - for(Object line : exchange.getIn().getBody(ArrayList.class)){ - if(line instanceof ProfileToImport) { + for (Object line : exchange.getIn().getBody(ArrayList.class)) { + if (line instanceof ProfileToImport) { successCount++; - } else if(line instanceof ImportLineError) { + } else if (line instanceof ImportLineError) { failureCount++; errors.add(((ImportLineError) line)); } else { @@ -64,18 +64,18 @@ public class RouteCompletionProcessor implements Processor { } Map execution = new HashMap(); - execution.put("date", ((Date)exchange.getProperty("CamelCreatedTimestamp")).getTime()); + execution.put("date", ((Date) exchange.getProperty("CamelCreatedTimestamp")).getTime()); execution.put("totalLinesNb", exchange.getProperty("CamelSplitSize")); execution.put("successCount", successCount); execution.put("failureCount", failureCount); execution.put("errors", errors); - if(importConfiguration.getExecutions().size() >= executionsHistorySize) { + if (importConfiguration.getExecutions().size() >= executionsHistorySize) { int oldestExecIndex = 0; - long oldestExecDate = (Long)importConfiguration.getExecutions().get(0).get("date"); - for(int i = 1 ; i < importConfiguration.getExecutions().size() ; i++) { - if((Long)importConfiguration.getExecutions().get(i).get("date") < oldestExecDate) { - oldestExecDate = (Long)importConfiguration.getExecutions().get(i).get("date"); + long oldestExecDate = (Long) importConfiguration.getExecutions().get(0).get("date"); + for (int i = 1; i < importConfiguration.getExecutions().size(); i++) { + if ((Long) importConfiguration.getExecutions().get(i).get("date") < oldestExecDate) { + oldestExecDate = (Long) importConfiguration.getExecutions().get(i).get("date"); oldestExecIndex = i; } } @@ -84,11 +84,11 @@ public class RouteCompletionProcessor implements Processor { importConfiguration.getExecutions().add(execution); //Set running to false, route is complete - if(failureCount>0 && successCount>0) { + if (failureCount > 0 && successCount > 0) { importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_WITH_ERRORS); - } else if(failureCount>0 && successCount==0) { + } else if (failureCount > 0 && successCount == 0) { importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_ERRORS); - } else if(failureCount==0 && successCount>0) { + } else if (failureCount == 0 && successCount > 0) { importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_SUCCESS); } importConfigurationService.save(importConfiguration); http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java index ca28baf..f4f7a43 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java @@ -54,22 +54,22 @@ public abstract class ProfileImportAbstractRouteBuilder extends RouteBuilder { public Object getEndpointURI(String direction) { Object endpoint; - if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { //Prepare Kafka Deposit StringBuilder kafkaUri = new StringBuilder("kafka:"); kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); if (StringUtils.isNotBlank(kafkaImportGroupId)) { kafkaUri.append("&groupId=" + kafkaImportGroupId); } - if(RouterConstants.DIRECTION_TO.equals(direction)) { - kafkaUri.append("&autoCommitEnable="+kafkaImportAutoCommit+"&consumersCount="+kafkaImportConsumerCount); + if (RouterConstants.DIRECTION_TO.equals(direction)) { + kafkaUri.append("&autoCommitEnable=" + kafkaImportAutoCommit + "&consumersCount=" + kafkaImportConsumerCount); } KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort); kafkaConfiguration.setTopic(kafkaImportTopic); kafkaConfiguration.setGroupId(kafkaImportGroupId); endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); - ((KafkaEndpoint)endpoint).setConfiguration(kafkaConfiguration); + ((KafkaEndpoint) endpoint).setConfiguration(kafkaConfiguration); } else { endpoint = RouterConstants.DIRECT_DEPOSIT_BUFFER; } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java index 7d6bd07..0af561f 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -28,7 +28,6 @@ import org.apache.unomi.router.core.RouterConstants; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.apache.unomi.router.core.processor.LineSplitFailureHandler; import org.apache.unomi.router.core.processor.LineSplitProcessor; -import org.apache.unomi.router.core.processor.RouteCompletionProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +55,7 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo logger.info("Configure Recurrent Route 'From Source'"); - if(importConfigurationList == null) { + if (importConfigurationList == null) { importConfigurationList = importConfigurationService.getImportConfigurations(); } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java index 84b220c..c86e5e0 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -16,15 +16,14 @@ */ package org.apache.unomi.router.core.route; -import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.LoggingLevel; import org.apache.camel.component.jackson.JacksonDataFormat; -import org.apache.camel.component.kafka.KafkaComponent; -import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.camel.model.ProcessorDefinition; -import org.apache.commons.lang3.StringUtils; import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; +import org.apache.unomi.router.core.processor.LineSplitFailureHandler; import org.apache.unomi.router.core.processor.LineSplitProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +51,17 @@ public class ProfileImportOneShotRouteBuilder extends ProfileImportAbstractRoute logger.info("Configure OneShot Route..."); + ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class) + .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !") + .handled(true) + .process(new LineSplitFailureHandler()); + + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } else { + prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } + LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); ProcessorDefinition prDef = from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m") http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b55a20b7/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java index 91c0bf0..94c1cef 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java @@ -19,7 +19,6 @@ package org.apache.unomi.router.core.route; import org.apache.camel.LoggingLevel; import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.component.kafka.KafkaEndpoint; -import org.apache.camel.model.Constants; import org.apache.camel.model.RouteDefinition; import org.apache.unomi.router.core.RouterConstants; import org.apache.unomi.router.core.processor.RouteCompletionProcessor;