UNOMI-101 : Execution History
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/6570c8ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/6570c8ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/6570c8ad Branch: refs/heads/feature-UNOMI-5-KARAF4 Commit: 6570c8adad7e3accd3f7011977cff86435901a7b Parents: 18af860 Author: Abdelkader Midani <amid...@apache.org> Authored: Sat Jun 17 06:11:04 2017 +0200 Committer: Abdelkader Midani <amid...@apache.org> Committed: Sat Jun 17 06:11:29 2017 +0200 ---------------------------------------------------------------------- .../unomi/router/api/ImportConfiguration.java | 46 ++++++++- .../unomi/router/api/ImportLineError.java | 76 +++++++++++++++ .../services/ImportConfigurationService.java | 4 +- extensions/router/router-core/pom.xml | 1 + .../unomi/router/core/RouterConstants.java | 3 +- .../core/context/ProfileImportCamelContext.java | 25 +++-- .../core/processor/LineSplitFailureHandler.java | 11 ++- .../core/processor/LineSplitProcessor.java | 2 +- .../processor/RouteCompletionProcessor.java | 99 ++++++++++++++++++++ .../ProfileImportFromSourceRouteBuilder.java | 50 +++++++--- .../route/ProfileImportToUnomiRouteBuilder.java | 31 ++++-- .../strategy/ArrayListAggregationStrategy.java | 46 +++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 12 +++ .../main/resources/org.apache.unomi.router.cfg | 5 +- extensions/router/router-rest/pom.xml | 1 + .../ImportConfigurationServiceImpl.java | 5 +- 16 files changed, 371 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java index 127de39..bf17a31 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java @@ -19,9 +19,8 @@ package org.apache.unomi.router.api; import org.apache.unomi.api.Item; import org.apache.unomi.api.MetadataItem; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import javax.lang.model.type.MirroredTypeException; +import java.util.*; /** * Created by amidani on 28/04/2017. @@ -45,6 +44,9 @@ public class ImportConfiguration extends Item { private String columnSeparator = ","; private String lineSeparator = "\n"; private boolean active = false; + private boolean running = false; + + private List<Map<String, Object>> executions = new ArrayList(); /** * Sets the property identified by the specified name to the specified value. If a property with that name already exists, replaces its value, otherwise adds the new @@ -160,6 +162,24 @@ public class ImportConfiguration extends Item { } /** + * Retrieves the import configuration running flag. + * + * @return true if the import configuration is running false if not + */ + public boolean isRunning() { + return this.running; + } + + /** + * Sets the running flag true/false. + * + * @param running a boolean to set to running or inactive the import configuration + */ + public void setRunning(boolean running) { + this.running = running; + } + + /** * Retrieves the import configuration overwriteExistingProfiles flag. * * @return true if during the import existing profiles must be overwritten @@ -186,7 +206,7 @@ public class ImportConfiguration extends Item { } /** - * gets the column separator. + * Retrieves the column separator. */ public String getColumnSeparator() { return this.columnSeparator; @@ -203,7 +223,7 @@ public class ImportConfiguration extends Item { } /** - * gets the line separator. + * Retrieves the line separator. */ public String getLineSeparator() { return this.lineSeparator; @@ -219,5 +239,21 @@ public class ImportConfiguration extends Item { } } + /** + * Retrieves the executions + */ + public List<Map<String, Object>> getExecutions() { + return this.executions; + } + + + /** + * Sets the executions + * @param executions + */ + public void setExecutions(List<Map<String, Object>> executions) { + this.executions = executions; + } + } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java new file mode 100644 index 0000000..822352d --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.router.api; + +/** + * A line error object to carry failure + */ +public class ImportLineError { + + private long lineNb; + private String errorCode; + private String lineContent; + + /** + * Retrieves the number of the line which failed to be imported + * @return lineNb + */ + public long getLineNb() { + return lineNb; + } + + /** + * Sets the number of the line which failed to be imported + * @param lineNb + */ + public void setLineNb(long lineNb) { + this.lineNb = lineNb; + } + + /** + * Retrieves the error code + * @return errorCode + */ + public String getErrorCode() { + return errorCode; + } + + /** + * Sets the error code + * @param errorCode + */ + public void setErrorCode(String errorCode) { + this.errorCode = errorCode; + } + + /** + * Retrieves the line original content + * @return lineContent + */ + public String getLineContent() { + return lineContent; + } + + /** + * Sets the line content + * @param lineContent + */ + public void setLineContent(String lineContent) { + this.lineContent = lineContent; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java index cacd671..92991fe 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java @@ -44,10 +44,10 @@ public interface ImportConfigurationService { /** * Saves the specified import configuration in the context server. * - * @param profile the import configuration to be saved + * @param importConfiguration the import configuration to be saved * @return the newly saved import configuration */ - public ImportConfiguration save(ImportConfiguration profile); + public ImportConfiguration save(ImportConfiguration importConfiguration); /** * Deletes the import configuration identified by the specified identifier. http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/pom.xml b/extensions/router/router-core/pom.xml index c6fbf5d..e20dfc5 100644 --- a/extensions/router/router-core/pom.xml +++ b/extensions/router/router-core/pom.xml @@ -135,6 +135,7 @@ org.apache.camel.model, org.apache.camel.model.dataformat, org.apache.camel.model.rest, + org.apache.camel.processor.aggregate, org.apache.camel.spi, org.apache.unomi.api, org.apache.unomi.router.api, http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java index f09e993..d15f2c3 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java @@ -28,7 +28,8 @@ public interface RouterConstants { String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot"; String HEADER_CONFIG_TYPE = "configType"; - String HEADER_PROFILES_COUNT = "profilesCount"; + + String HEADER_FAILED_MESSAGE = "failedMessage"; String DIRECTION_FROM = "from"; String DIRECTION_TO = "to"; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java index 9a4a004..f54beb1 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java @@ -23,10 +23,11 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.services.ImportConfigurationService; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; +import org.apache.unomi.router.core.processor.RouteCompletionProcessor; import org.apache.unomi.router.core.processor.UnomiStorageProcessor; -import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder; -import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder; import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.BundleEvent; @@ -44,19 +45,17 @@ import java.util.concurrent.TimeUnit; */ public class ProfileImportCamelContext implements SynchronousBundleListener { + private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent"; private Logger logger = LoggerFactory.getLogger(ProfileImportCamelContext.class.getName()); - private CamelContext camelContext; private UnomiStorageProcessor unomiStorageProcessor; + private RouteCompletionProcessor routeCompletionProcessor; private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; private ImportConfigurationService importConfigurationService; private JacksonDataFormat jacksonDataFormat; private String uploadDir; private Map<String, String> kafkaProps; private String configType; - - private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent"; - private BundleContext bundleContext; public void setBundleContext(BundleContext bundleContext) { @@ -66,9 +65,9 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { public void initCamelContext() throws Exception { logger.info("Initialize Camel Context..."); camelContext = new DefaultCamelContext(); - List<ImportConfiguration> importConfigurationList = importConfigurationService.getImportConfigurations(); + ProfileImportFromSourceRouteBuilder builderReader = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); - builderReader.setImportConfigurationList(importConfigurationList); + builderReader.setImportConfigurationService(importConfigurationService); builderReader.setJacksonDataFormat(jacksonDataFormat); builderReader.setContext(camelContext); camelContext.addRoutes(builderReader); @@ -84,6 +83,7 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { ProfileImportToUnomiRouteBuilder builderProcessor = new ProfileImportToUnomiRouteBuilder(kafkaProps, configType); builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor); + builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor); builderProcessor.setJacksonDataFormat(jacksonDataFormat); builderProcessor.setContext(camelContext); camelContext.addRoutes(builderProcessor); @@ -109,13 +109,14 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { public void updateProfileImportReaderRoute(ImportConfiguration importConfiguration) throws Exception { Route route = camelContext.getRoute(importConfiguration.getItemId()); - if(route!=null && stopRoute(importConfiguration.getItemId())) { + if (route != null && stopRoute(importConfiguration.getItemId())) { camelContext.removeRoute(importConfiguration.getItemId()); } //Handle transforming an import config oneshot <--> recurrent - if(IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())){ + if (IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())) { ProfileImportFromSourceRouteBuilder builder = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); builder.setImportConfigurationList(Arrays.asList(importConfiguration)); + builder.setImportConfigurationService(importConfigurationService); builder.setJacksonDataFormat(jacksonDataFormat); builder.setContext(camelContext); camelContext.addRoutes(builder); @@ -130,6 +131,10 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { this.unomiStorageProcessor = unomiStorageProcessor; } + public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) { + this.routeCompletionProcessor = routeCompletionProcessor; + } + public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { this.importConfigByFileNameProcessor = importConfigByFileNameProcessor; } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java index b1de82a..8f14f67 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java @@ -18,6 +18,9 @@ package org.apache.unomi.router.core.processor; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.unomi.router.api.ImportLineError; +import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +32,12 @@ public class LineSplitFailureHandler implements Processor { private static final Logger logger = LoggerFactory.getLogger(LineSplitFailureHandler.class.getName()); public void process(Exchange exchange) throws Exception { - logger.error("{}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT)); + logger.debug("Route: {}, Error: {}", exchange.getProperty(Exchange.FAILURE_ROUTE_ID), exchange.getProperty(Exchange.EXCEPTION_CAUGHT)); + ImportLineError importLineError = new ImportLineError(); + importLineError.setErrorCode(((BadProfileDataFormatException)exchange.getProperty(Exchange.EXCEPTION_CAUGHT)).getCause().getMessage()); + importLineError.setLineContent(exchange.getIn().getBody(String.class)); + importLineError.setLineNb(((Integer)exchange.getProperty("CamelSplitIndex")+1)); + exchange.getIn().setHeader(RouterConstants.HEADER_FAILED_MESSAGE, new Boolean(true)); + exchange.getIn().setBody(importLineError, ImportLineError.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java index da14aa5..a03c833 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -57,7 +57,7 @@ public class LineSplitProcessor implements Processor { profileToImport.setScope("system"); if(profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) { if(fieldsMapping.size() != (profileData.length - 1)) { - throw new BadProfileDataFormatException("The index does not match the number of column : line ["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new Throwable("MAPPING_COLUMN_MATCH")); + throw new BadProfileDataFormatException("The mapping does not match the number of column : line ["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new Throwable("MAPPING_COLUMN_MATCH")); } Map<String, Object> properties = new HashMap<>(); for (String fieldMappingKey : fieldsMapping.keySet()) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java new file mode 100644 index 0000000..e4f01cc --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.api.ImportLineError; +import org.apache.unomi.router.api.ProfileToImport; +import org.apache.unomi.router.api.services.ImportConfigurationService; +import org.apache.unomi.router.core.RouterConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Created by amidani on 14/06/2017. + */ +public class RouteCompletionProcessor implements Processor { + + private static final Logger logger = LoggerFactory.getLogger(RouteCompletionProcessor.class.getName()); + private ImportConfigurationService importConfigurationService; + private int executionsHistorySize; + + @Override + public void process(Exchange exchange) throws Exception { + String importConfigId = null; + ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT); + if(importConfigOneShot!=null) { + importConfigId = importConfigOneShot.getItemId(); + } else { + importConfigId = exchange.getFromRouteId(); + } + ImportConfiguration importConfiguration = importConfigurationService.load(importConfigId); + long successCount = 0; + long failureCount = 0; + long ignoreCount = 0; + List<ImportLineError> errors = new ArrayList<ImportLineError>(); + + for(Object line : exchange.getIn().getBody(ArrayList.class)){ + if(line instanceof ProfileToImport) { + successCount++; + } else if(line instanceof ImportLineError) { + failureCount++; + errors.add(((ImportLineError) line)); + } else { + ignoreCount++; + } + } + + Map execution = new HashMap(); + execution.put("date", ((Date)exchange.getProperty("CamelCreatedTimestamp")).getTime()); + execution.put("totalLinesNb", exchange.getProperty("CamelSplitSize")); + execution.put("successCount", successCount); + execution.put("failureCount", failureCount); + execution.put("errors", errors); + + if(importConfiguration.getExecutions().size() >= executionsHistorySize) { + int oldestExecIndex = 0; + long oldestExecDate = (Long)importConfiguration.getExecutions().get(0).get("date"); + for(int i = 1 ; i < importConfiguration.getExecutions().size() ; i++) { + if((Long)importConfiguration.getExecutions().get(i).get("date") < oldestExecDate) { + oldestExecDate = (Long)importConfiguration.getExecutions().get(i).get("date"); + oldestExecIndex = i; + } + } + importConfiguration.getExecutions().remove(oldestExecIndex); + } + + importConfiguration.getExecutions().add(execution); + //Set running to false, route is complete + importConfiguration.setRunning(false); + importConfigurationService.save(importConfiguration); + logger.info("Processing route {} completed.", exchange.getFromRouteId()); + } + + public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { + this.importConfigurationService = importConfigurationService; + } + + public void setExecutionsHistorySize(int executionsHistorySize) { + this.executionsHistorySize = executionsHistorySize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java index 9be6fb6..ca64091 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -16,18 +16,23 @@ */ package org.apache.unomi.router.core.route; +import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; +import org.apache.camel.Processor; import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.camel.model.ProcessorDefinition; import org.apache.commons.lang3.StringUtils; import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.api.services.ImportConfigurationService; import org.apache.unomi.router.core.RouterConstants; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.apache.unomi.router.core.processor.LineSplitFailureHandler; import org.apache.unomi.router.core.processor.LineSplitProcessor; +import org.apache.unomi.router.core.processor.RouteCompletionProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Date; import java.util.List; import java.util.Map; @@ -40,7 +45,7 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo private static final Logger logger = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName()); private List<ImportConfiguration> importConfigurationList; - + private ImportConfigurationService importConfigurationService; public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); @@ -51,8 +56,12 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo logger.info("Configure Recurrent Route 'From Source'"); + if(importConfigurationList == null) { + importConfigurationList = importConfigurationService.getImportConfigurations(); + } + //Loop on multiple import configuration - for (ImportConfiguration importConfiguration : importConfigurationList) { + for (final ImportConfiguration importConfiguration : importConfigurationList) { if (importConfiguration.getProperties().size() > 0 && StringUtils.isNotEmpty((String) importConfiguration.getProperties().get("source"))) { //Prepare Split Processor @@ -63,24 +72,39 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty()); lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator()); - onException(BadProfileDataFormatException.class) + ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class) .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !") .handled(true) - .process(new LineSplitFailureHandler()) - .to("direct:errors"); + .process(new LineSplitFailureHandler()); - errorHandler(deadLetterChannel("direct:errors")); + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } else { + prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } ProcessorDefinition prDef = from((String) importConfiguration.getProperties().get("source")) .routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop .autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active + .onCompletion() + // this route is only invoked when the original route is complete as a kind + // of completion callback + .log(LoggingLevel.DEBUG, "ROUTE [" + importConfiguration.getItemId() + "] is now complete [" + new Date().toString() + "]") + // must use end to denote the end of the onCompletion route + .end() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + importConfiguration.setRunning(true); + importConfigurationService.save(importConfiguration); + } + }) .split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator())) - .log(LoggingLevel.INFO, "Splitted into ${exchangeProperty.CamelSplitSize} records") - .setHeader(RouterConstants.HEADER_PROFILES_COUNT, exchangeProperty("CamelSplitSize}")) + .log(LoggingLevel.DEBUG, "Splitted into ${exchangeProperty.CamelSplitSize} records") .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType)) .process(lineSplitProcessor) - .log(LoggingLevel.INFO, "Split IDX ${exchangeProperty.CamelSplitIndex} record") - .to("log:org.apache.unomi.router?level=INFO") + .log(LoggingLevel.DEBUG, "Split IDX ${exchangeProperty.CamelSplitIndex} record") + .to("log:org.apache.unomi.router?level=DEBUG") .marshal(jacksonDataFormat) .convertBodyTo(String.class); @@ -89,8 +113,6 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo } else { prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); } - - from("direct:errors").to("log:org.apache.unomi.router?level=ERROR"); } } } @@ -99,4 +121,8 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo this.importConfigurationList = importConfigurationList; } + public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { + this.importConfigurationService = importConfigurationService; + } + } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java index 9bec24b..91c0bf0 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java @@ -16,15 +16,15 @@ */ package org.apache.unomi.router.core.route; -import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.LoggingLevel; import org.apache.camel.component.jackson.JacksonDataFormat; -import org.apache.camel.component.kafka.KafkaComponent; -import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.camel.model.Constants; import org.apache.camel.model.RouteDefinition; -import org.apache.commons.lang3.StringUtils; import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.core.processor.RouteCompletionProcessor; import org.apache.unomi.router.core.processor.UnomiStorageProcessor; +import org.apache.unomi.router.core.strategy.ArrayListAggregationStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +38,7 @@ public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRoute private Logger logger = LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName()); private UnomiStorageProcessor unomiStorageProcessor; + private RouteCompletionProcessor routeCompletionProcessor; public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); @@ -49,21 +50,33 @@ public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRoute logger.info("Configure Recurrent Route 'To Target'"); RouteDefinition rtDef; - if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){ - rtDef=from((KafkaEndpoint)getEndpointURI(RouterConstants.DIRECTION_TO)); + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO)); } else { - rtDef=from((String)getEndpointURI(RouterConstants.DIRECTION_TO)); + rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO)); } - rtDef.unmarshal(jacksonDataFormat) + rtDef.choice() + .when(header(RouterConstants.HEADER_FAILED_MESSAGE).isNull()) + .unmarshal(jacksonDataFormat) .process(unomiStorageProcessor) + .otherwise() + .log(LoggingLevel.DEBUG, "Failed message, skip processing!") + .end() + .aggregate(constant(true), new ArrayListAggregationStrategy()) + .completionPredicate(exchangeProperty("CamelSplitComplete").isEqualTo("true")) + .eagerCheckCompletion() + .process(routeCompletionProcessor) .to("log:org.apache.unomi.router?level=INFO"); - } public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { this.unomiStorageProcessor = unomiStorageProcessor; } + public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) { + this.routeCompletionProcessor = routeCompletionProcessor; + } + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { this.jacksonDataFormat = jacksonDataFormat; } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java new file mode 100644 index 0000000..a53e34b --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.strategy; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; +import org.apache.unomi.router.core.processor.RouteCompletionProcessor; + +import java.util.ArrayList; + +/** + * Created by amidani on 16/06/2017. + */ +public class ArrayListAggregationStrategy implements AggregationStrategy { + + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + Object newBody = newExchange.getIn().getBody(); + ArrayList<Object> list = null; + if (oldExchange == null) { + list = new ArrayList<Object>(); + list.add(newBody); + newExchange.getIn().setBody(list); + return newExchange; + } else { + list = oldExchange.getIn().getBody(ArrayList.class); + list.add(newBody); + return oldExchange; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 53efbf3..86bcf47 100644 --- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -33,6 +33,7 @@ <cm:property name="kafka.import.consumerCount" value="10"/> <cm:property name="kafka.import.autoCommit" value="true"/> <cm:property name="import.oneshot.uploadDir" value="/tmp/oneshot_import_configs/"/> + <cm:property name="import.executionsHistory.size" value="5"/> </cm:default-properties> </cm:property-placeholder> @@ -40,6 +41,11 @@ <property name="profileImportService" ref="profileImportService"/> </bean> + <bean id="routeCompletionProcessor" class="org.apache.unomi.router.core.processor.RouteCompletionProcessor"> + <property name="importConfigurationService" ref="importConfigurationService"/> + <property name="executionsHistorySize" value="${import.executionsHistory.size}"/> + </bean> + <bean id="importConfigByFileNameProcessor" class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor"> <property name="importConfigurationService" ref="importConfigurationService"/> </bean> @@ -53,6 +59,11 @@ <property name="library" value="Jackson"/> </bean> + <bean id="jacksonDataFormatImportLineError" class="org.apache.camel.model.dataformat.JsonDataFormat"> + <property name="unmarshalType" value="org.apache.unomi.router.api.ImportLineError"/> + <property name="library" value="Jackson"/> + </bean> + <bean class="org.apache.camel.component.servlet.osgi.OsgiServletRegisterer" init-method="register" destroy-method="unregister"> @@ -79,6 +90,7 @@ </property> <property name="uploadDir" value="${import.oneshot.uploadDir}"/> <property name="unomiStorageProcessor" ref="unomiStorageProcessor"/> + <property name="routeCompletionProcessor" ref="routeCompletionProcessor"/> <property name="importConfigByFileNameProcessor" ref="importConfigByFileNameProcessor"/> <property name="importConfigurationService" ref="importConfigurationService"/> <property name="jacksonDataFormat" ref="jacksonDataFormat"/> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg index 0ce4bb2..e301b58 100644 --- a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg +++ b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg @@ -29,4 +29,7 @@ import.config.type=nobroker #kafka.import.autoCommit=true #Import One Shot upload directory -import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/ \ No newline at end of file +import.oneshot.uploadDir=${karaf.data}/tmp/unomi_oneshot_import_configs/ + +#Import executions history size +import.executionsHistory.size=5 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-rest/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-rest/pom.xml b/extensions/router/router-rest/pom.xml index fc1065f..e151363 100644 --- a/extensions/router/router-rest/pom.xml +++ b/extensions/router/router-rest/pom.xml @@ -68,6 +68,7 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> + <version>${version.jackson.core}</version> <scope>provided</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java index a4f6131..5283cce 100644 --- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java +++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java @@ -95,10 +95,7 @@ public class ImportConfigurationServiceImpl implements ImportConfigurationServic if (importConfiguration.getItemId() == null) { importConfiguration.setItemId(UUID.randomUUID().toString()); } - if(persistenceService.save(importConfiguration)) { - - } - + persistenceService.save(importConfiguration); return persistenceService.load(importConfiguration.getItemId(), ImportConfiguration.class); }