UNOMI-101 : Execution History

Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/6570c8ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/6570c8ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/6570c8ad

Branch: refs/heads/feature-UNOMI-5-KARAF4
Commit: 6570c8adad7e3accd3f7011977cff86435901a7b
Parents: 18af860
Author: Abdelkader Midani <amid...@apache.org>
Authored: Sat Jun 17 06:11:04 2017 +0200
Committer: Abdelkader Midani <amid...@apache.org>
Committed: Sat Jun 17 06:11:29 2017 +0200

----------------------------------------------------------------------
 .../unomi/router/api/ImportConfiguration.java   | 46 ++++++++-
 .../unomi/router/api/ImportLineError.java       | 76 +++++++++++++++
 .../services/ImportConfigurationService.java    |  4 +-
 extensions/router/router-core/pom.xml           |  1 +
 .../unomi/router/core/RouterConstants.java      |  3 +-
 .../core/context/ProfileImportCamelContext.java | 25 +++--
 .../core/processor/LineSplitFailureHandler.java | 11 ++-
 .../core/processor/LineSplitProcessor.java      |  2 +-
 .../processor/RouteCompletionProcessor.java     | 99 ++++++++++++++++++++
 .../ProfileImportFromSourceRouteBuilder.java    | 50 +++++++---
 .../route/ProfileImportToUnomiRouteBuilder.java | 31 ++++--
 .../strategy/ArrayListAggregationStrategy.java  | 46 +++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  | 12 +++
 .../main/resources/org.apache.unomi.router.cfg  |  5 +-
 extensions/router/router-rest/pom.xml           |  1 +
 .../ImportConfigurationServiceImpl.java         |  5 +-
 16 files changed, 371 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
index 127de39..bf17a31 100644
--- 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java
@@ -19,9 +19,8 @@ package org.apache.unomi.router.api;
 import org.apache.unomi.api.Item;
 import org.apache.unomi.api.MetadataItem;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import javax.lang.model.type.MirroredTypeException;
+import java.util.*;
 
 /**
  * Created by amidani on 28/04/2017.
@@ -45,6 +44,9 @@ public class ImportConfiguration extends Item {
     private String columnSeparator = ",";
     private String lineSeparator = "\n";
     private boolean active = false;
+    private boolean running = false;
+
+    private List<Map<String, Object>> executions = new ArrayList();
 
     /**
      * Sets the property identified by the specified name to the specified 
value. If a property with that name already exists, replaces its value, 
otherwise adds the new
@@ -160,6 +162,24 @@ public class ImportConfiguration extends Item {
     }
 
     /**
+     * Retrieves the import configuration running flag.
+     *
+     * @return true if the import configuration is running false if not
+     */
+    public boolean isRunning() {
+        return this.running;
+    }
+
+    /**
+     * Sets the running flag true/false.
+     *
+     * @param running a boolean to set to running or inactive the import 
configuration
+     */
+    public void setRunning(boolean running) {
+        this.running = running;
+    }
+
+    /**
      * Retrieves the import configuration overwriteExistingProfiles flag.
      *
      * @return true if during the import existing profiles must be overwritten
@@ -186,7 +206,7 @@ public class ImportConfiguration extends Item {
     }
 
     /**
-     * gets the column separator.
+     * Retrieves the column separator.
      */
     public String getColumnSeparator() {
         return this.columnSeparator;
@@ -203,7 +223,7 @@ public class ImportConfiguration extends Item {
     }
 
     /**
-     * gets the line separator.
+     * Retrieves the line separator.
      */
     public String getLineSeparator() {
         return this.lineSeparator;
@@ -219,5 +239,21 @@ public class ImportConfiguration extends Item {
         }
     }
 
+    /**
+     * Retrieves the executions
+     */
+    public List<Map<String, Object>> getExecutions() {
+        return this.executions;
+    }
+
+
+    /**
+     * Sets the executions
+     * @param executions
+     */
+    public void setExecutions(List<Map<String, Object>> executions) {
+        this.executions = executions;
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java
new file mode 100644
index 0000000..822352d
--- /dev/null
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportLineError.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.router.api;
+
+/**
+ * A line error object to carry failure
+ */
+public class ImportLineError {
+
+    private long lineNb;
+    private String errorCode;
+    private String lineContent;
+
+    /**
+     * Retrieves the number of the line which failed to be imported
+     * @return lineNb
+     */
+    public long getLineNb() {
+        return lineNb;
+    }
+
+    /**
+     * Sets the number of the line which failed to be imported
+     * @param lineNb
+     */
+    public void setLineNb(long lineNb) {
+        this.lineNb = lineNb;
+    }
+
+    /**
+     * Retrieves the error code
+     * @return errorCode
+     */
+    public String getErrorCode() {
+        return errorCode;
+    }
+
+    /**
+     * Sets the error code
+     * @param errorCode
+     */
+    public void setErrorCode(String errorCode) {
+        this.errorCode = errorCode;
+    }
+
+    /**
+     * Retrieves the line original content
+     * @return lineContent
+     */
+    public String getLineContent() {
+        return lineContent;
+    }
+
+    /**
+     * Sets the line content
+     * @param lineContent
+     */
+    public void setLineContent(String lineContent) {
+        this.lineContent = lineContent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
index cacd671..92991fe 100644
--- 
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
+++ 
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java
@@ -44,10 +44,10 @@ public interface ImportConfigurationService {
     /**
      * Saves the specified import configuration in the context server.
      *
-     * @param profile the import configuration to be saved
+     * @param importConfiguration the import configuration to be saved
      * @return the newly saved import configuration
      */
-    public ImportConfiguration save(ImportConfiguration profile);
+    public ImportConfiguration save(ImportConfiguration importConfiguration);
 
     /**
      * Deletes the import configuration identified by the specified identifier.

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/router/router-core/pom.xml 
b/extensions/router/router-core/pom.xml
index c6fbf5d..e20dfc5 100644
--- a/extensions/router/router-core/pom.xml
+++ b/extensions/router/router-core/pom.xml
@@ -135,6 +135,7 @@
                             org.apache.camel.model,
                             org.apache.camel.model.dataformat,
                             org.apache.camel.model.rest,
+                            org.apache.camel.processor.aggregate,
                             org.apache.camel.spi,
                             org.apache.unomi.api,
                             org.apache.unomi.router.api,

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
index f09e993..d15f2c3 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
@@ -28,7 +28,8 @@ public interface RouterConstants {
 
     String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot";
     String HEADER_CONFIG_TYPE = "configType";
-    String HEADER_PROFILES_COUNT = "profilesCount";
+
+    String HEADER_FAILED_MESSAGE = "failedMessage";
 
     String DIRECTION_FROM = "from";
     String DIRECTION_TO = "to";

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
index 9a4a004..f54beb1 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
@@ -23,10 +23,11 @@ import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.services.ImportConfigurationService;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
+import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
-import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder;
 import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder;
+import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder;
+import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleEvent;
@@ -44,19 +45,17 @@ import java.util.concurrent.TimeUnit;
  */
 public class ProfileImportCamelContext implements SynchronousBundleListener {
 
+    private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent";
     private Logger logger = 
LoggerFactory.getLogger(ProfileImportCamelContext.class.getName());
-
     private CamelContext camelContext;
     private UnomiStorageProcessor unomiStorageProcessor;
+    private RouteCompletionProcessor routeCompletionProcessor;
     private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
     private ImportConfigurationService importConfigurationService;
     private JacksonDataFormat jacksonDataFormat;
     private String uploadDir;
     private Map<String, String> kafkaProps;
     private String configType;
-
-    private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent";
-
     private BundleContext bundleContext;
 
     public void setBundleContext(BundleContext bundleContext) {
@@ -66,9 +65,9 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
     public void initCamelContext() throws Exception {
         logger.info("Initialize Camel Context...");
         camelContext = new DefaultCamelContext();
-        List<ImportConfiguration> importConfigurationList = 
importConfigurationService.getImportConfigurations();
+
         ProfileImportFromSourceRouteBuilder builderReader = new 
ProfileImportFromSourceRouteBuilder(kafkaProps, configType);
-        builderReader.setImportConfigurationList(importConfigurationList);
+        
builderReader.setImportConfigurationService(importConfigurationService);
         builderReader.setJacksonDataFormat(jacksonDataFormat);
         builderReader.setContext(camelContext);
         camelContext.addRoutes(builderReader);
@@ -84,6 +83,7 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
 
         ProfileImportToUnomiRouteBuilder builderProcessor = new 
ProfileImportToUnomiRouteBuilder(kafkaProps, configType);
         builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor);
+        builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor);
         builderProcessor.setJacksonDataFormat(jacksonDataFormat);
         builderProcessor.setContext(camelContext);
         camelContext.addRoutes(builderProcessor);
@@ -109,13 +109,14 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
 
     public void updateProfileImportReaderRoute(ImportConfiguration 
importConfiguration) throws Exception {
         Route route = camelContext.getRoute(importConfiguration.getItemId());
-        if(route!=null && stopRoute(importConfiguration.getItemId())) {
+        if (route != null && stopRoute(importConfiguration.getItemId())) {
             camelContext.removeRoute(importConfiguration.getItemId());
         }
         //Handle transforming an import config oneshot <--> recurrent
-        
if(IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())){
+        if 
(IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())) {
             ProfileImportFromSourceRouteBuilder builder = new 
ProfileImportFromSourceRouteBuilder(kafkaProps, configType);
             
builder.setImportConfigurationList(Arrays.asList(importConfiguration));
+            builder.setImportConfigurationService(importConfigurationService);
             builder.setJacksonDataFormat(jacksonDataFormat);
             builder.setContext(camelContext);
             camelContext.addRoutes(builder);
@@ -130,6 +131,10 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
         this.unomiStorageProcessor = unomiStorageProcessor;
     }
 
+    public void setRouteCompletionProcessor(RouteCompletionProcessor 
routeCompletionProcessor) {
+        this.routeCompletionProcessor = routeCompletionProcessor;
+    }
+
     public void 
setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor 
importConfigByFileNameProcessor) {
         this.importConfigByFileNameProcessor = importConfigByFileNameProcessor;
     }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
index b1de82a..8f14f67 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
@@ -18,6 +18,9 @@ package org.apache.unomi.router.core.processor;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ImportLineError;
+import org.apache.unomi.router.core.RouterConstants;
+import org.apache.unomi.router.core.exception.BadProfileDataFormatException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,6 +32,12 @@ public class LineSplitFailureHandler implements Processor {
     private static final Logger logger = 
LoggerFactory.getLogger(LineSplitFailureHandler.class.getName());
 
     public void process(Exchange exchange) throws Exception {
-        logger.error("{}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT));
+        logger.debug("Route: {}, Error: {}", 
exchange.getProperty(Exchange.FAILURE_ROUTE_ID), 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT));
+        ImportLineError importLineError = new ImportLineError();
+        
importLineError.setErrorCode(((BadProfileDataFormatException)exchange.getProperty(Exchange.EXCEPTION_CAUGHT)).getCause().getMessage());
+        importLineError.setLineContent(exchange.getIn().getBody(String.class));
+        
importLineError.setLineNb(((Integer)exchange.getProperty("CamelSplitIndex")+1));
+        exchange.getIn().setHeader(RouterConstants.HEADER_FAILED_MESSAGE, new 
Boolean(true));
+        exchange.getIn().setBody(importLineError, ImportLineError.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
index da14aa5..a03c833 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
@@ -57,7 +57,7 @@ public class LineSplitProcessor implements Processor {
         profileToImport.setScope("system");
         if(profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) {
             if(fieldsMapping.size() != (profileData.length - 1)) {
-                throw new BadProfileDataFormatException("The index does not 
match the number of column : line 
["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new 
Throwable("MAPPING_COLUMN_MATCH"));
+                throw new BadProfileDataFormatException("The mapping does not 
match the number of column : line 
["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new 
Throwable("MAPPING_COLUMN_MATCH"));
             }
             Map<String, Object> properties = new HashMap<>();
             for (String fieldMappingKey : fieldsMapping.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
new file mode 100644
index 0000000..e4f01cc
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.ImportLineError;
+import org.apache.unomi.router.api.ProfileToImport;
+import org.apache.unomi.router.api.services.ImportConfigurationService;
+import org.apache.unomi.router.core.RouterConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Created by amidani on 14/06/2017.
+ */
+public class RouteCompletionProcessor implements Processor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(RouteCompletionProcessor.class.getName());
+    private ImportConfigurationService importConfigurationService;
+    private int executionsHistorySize;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String importConfigId = null;
+        ImportConfiguration importConfigOneShot = (ImportConfiguration) 
exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT);
+        if(importConfigOneShot!=null) {
+            importConfigId = importConfigOneShot.getItemId();
+        } else {
+            importConfigId = exchange.getFromRouteId();
+        }
+        ImportConfiguration importConfiguration = 
importConfigurationService.load(importConfigId);
+        long successCount = 0;
+        long failureCount = 0;
+        long ignoreCount = 0;
+        List<ImportLineError> errors = new ArrayList<ImportLineError>();
+
+        for(Object line : exchange.getIn().getBody(ArrayList.class)){
+            if(line instanceof ProfileToImport) {
+                successCount++;
+            } else if(line instanceof ImportLineError) {
+                failureCount++;
+                errors.add(((ImportLineError) line));
+            } else {
+                ignoreCount++;
+            }
+        }
+
+        Map execution = new HashMap();
+        execution.put("date", 
((Date)exchange.getProperty("CamelCreatedTimestamp")).getTime());
+        execution.put("totalLinesNb", exchange.getProperty("CamelSplitSize"));
+        execution.put("successCount", successCount);
+        execution.put("failureCount", failureCount);
+        execution.put("errors", errors);
+
+        if(importConfiguration.getExecutions().size() >= 
executionsHistorySize) {
+            int oldestExecIndex = 0;
+            long oldestExecDate = 
(Long)importConfiguration.getExecutions().get(0).get("date");
+            for(int i = 1 ; i < importConfiguration.getExecutions().size() ; 
i++) {
+                
if((Long)importConfiguration.getExecutions().get(i).get("date") < 
oldestExecDate) {
+                    oldestExecDate = 
(Long)importConfiguration.getExecutions().get(i).get("date");
+                    oldestExecIndex = i;
+                }
+            }
+            importConfiguration.getExecutions().remove(oldestExecIndex);
+        }
+
+        importConfiguration.getExecutions().add(execution);
+        //Set running to false, route is complete
+        importConfiguration.setRunning(false);
+        importConfigurationService.save(importConfiguration);
+        logger.info("Processing route {} completed.", 
exchange.getFromRouteId());
+    }
+
+    public void setImportConfigurationService(ImportConfigurationService 
importConfigurationService) {
+        this.importConfigurationService = importConfigurationService;
+    }
+
+    public void setExecutionsHistorySize(int executionsHistorySize) {
+        this.executionsHistorySize = executionsHistorySize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
index 9be6fb6..ca64091 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
@@ -16,18 +16,23 @@
  */
 package org.apache.unomi.router.core.route;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
+import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.services.ImportConfigurationService;
 import org.apache.unomi.router.core.RouterConstants;
 import org.apache.unomi.router.core.exception.BadProfileDataFormatException;
 import org.apache.unomi.router.core.processor.LineSplitFailureHandler;
 import org.apache.unomi.router.core.processor.LineSplitProcessor;
+import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
@@ -40,7 +45,7 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
     private static final Logger logger = 
LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName());
 
     private List<ImportConfiguration> importConfigurationList;
-
+    private ImportConfigurationService importConfigurationService;
 
     public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
         super(kafkaProps, configType);
@@ -51,8 +56,12 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
 
         logger.info("Configure Recurrent Route 'From Source'");
 
+        if(importConfigurationList == null) {
+            importConfigurationList = 
importConfigurationService.getImportConfigurations();
+        }
+
         //Loop on multiple import configuration
-        for (ImportConfiguration importConfiguration : 
importConfigurationList) {
+        for (final ImportConfiguration importConfiguration : 
importConfigurationList) {
             if (importConfiguration.getProperties().size() > 0 &&
                     StringUtils.isNotEmpty((String) 
importConfiguration.getProperties().get("source"))) {
                 //Prepare Split Processor
@@ -63,24 +72,39 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
                 
lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty());
                 
lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator());
 
-                onException(BadProfileDataFormatException.class)
+                ProcessorDefinition prDefErr = 
onException(BadProfileDataFormatException.class)
                         .log(LoggingLevel.ERROR, "Error processing record 
${exchangeProperty.CamelSplitIndex}++ !")
                         .handled(true)
-                        .process(new LineSplitFailureHandler())
-                        .to("direct:errors");
+                        .process(new LineSplitFailureHandler());
 
-                errorHandler(deadLetterChannel("direct:errors"));
+                if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+                    prDefErr.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+                } else {
+                    prDefErr.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+                }
 
                 ProcessorDefinition prDef = from((String) 
importConfiguration.getProperties().get("source"))
                         .routeId(importConfiguration.getItemId())// This allow 
identification of the route for manual start/stop
                         .autoStartup(importConfiguration.isActive())// 
Auto-start if the import configuration is set active
+                        .onCompletion()
+                        // this route is only invoked when the original route 
is complete as a kind
+                        // of completion callback
+                        .log(LoggingLevel.DEBUG, "ROUTE [" + 
importConfiguration.getItemId() + "] is now complete [" + new Date().toString() 
+ "]")
+                        // must use end to denote the end of the onCompletion 
route
+                        .end()
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws 
Exception {
+                                importConfiguration.setRunning(true);
+                                
importConfigurationService.save(importConfiguration);
+                            }
+                        })
                         
.split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator()))
-                        .log(LoggingLevel.INFO, "Splitted into 
${exchangeProperty.CamelSplitSize} records")
-                        .setHeader(RouterConstants.HEADER_PROFILES_COUNT, 
exchangeProperty("CamelSplitSize}"))
+                        .log(LoggingLevel.DEBUG, "Splitted into 
${exchangeProperty.CamelSplitSize} records")
                         .setHeader(RouterConstants.HEADER_CONFIG_TYPE, 
constant(configType))
                         .process(lineSplitProcessor)
-                        .log(LoggingLevel.INFO, "Split IDX 
${exchangeProperty.CamelSplitIndex} record")
-                        .to("log:org.apache.unomi.router?level=INFO")
+                        .log(LoggingLevel.DEBUG, "Split IDX 
${exchangeProperty.CamelSplitIndex} record")
+                        .to("log:org.apache.unomi.router?level=DEBUG")
                         .marshal(jacksonDataFormat)
                         .convertBodyTo(String.class);
 
@@ -89,8 +113,6 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
                 } else {
                     prDef.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
                 }
-
-                
from("direct:errors").to("log:org.apache.unomi.router?level=ERROR");
             }
         }
     }
@@ -99,4 +121,8 @@ public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRo
         this.importConfigurationList = importConfigurationList;
     }
 
+    public void setImportConfigurationService(ImportConfigurationService 
importConfigurationService) {
+        this.importConfigurationService = importConfigurationService;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
index 9bec24b..91c0bf0 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
@@ -16,15 +16,15 @@
  */
 package org.apache.unomi.router.core.route;
 
-import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.component.jackson.JacksonDataFormat;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.Constants;
 import org.apache.camel.model.RouteDefinition;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.router.core.RouterConstants;
+import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
+import org.apache.unomi.router.core.strategy.ArrayListAggregationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +38,7 @@ public class ProfileImportToUnomiRouteBuilder extends 
ProfileImportAbstractRoute
     private Logger logger = 
LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName());
 
     private UnomiStorageProcessor unomiStorageProcessor;
+    private RouteCompletionProcessor routeCompletionProcessor;
 
     public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
         super(kafkaProps, configType);
@@ -49,21 +50,33 @@ public class ProfileImportToUnomiRouteBuilder extends 
ProfileImportAbstractRoute
         logger.info("Configure Recurrent Route 'To Target'");
 
         RouteDefinition rtDef;
-        if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){
-            
rtDef=from((KafkaEndpoint)getEndpointURI(RouterConstants.DIRECTION_TO));
+        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            rtDef = from((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_TO));
         } else {
-            rtDef=from((String)getEndpointURI(RouterConstants.DIRECTION_TO));
+            rtDef = from((String) 
getEndpointURI(RouterConstants.DIRECTION_TO));
         }
-        rtDef.unmarshal(jacksonDataFormat)
+        rtDef.choice()
+                .when(header(RouterConstants.HEADER_FAILED_MESSAGE).isNull())
+                .unmarshal(jacksonDataFormat)
                 .process(unomiStorageProcessor)
+                .otherwise()
+                .log(LoggingLevel.DEBUG, "Failed message, skip processing!")
+                .end()
+                .aggregate(constant(true), new ArrayListAggregationStrategy())
+                
.completionPredicate(exchangeProperty("CamelSplitComplete").isEqualTo("true"))
+                .eagerCheckCompletion()
+                .process(routeCompletionProcessor)
                 .to("log:org.apache.unomi.router?level=INFO");
-
     }
 
     public void setUnomiStorageProcessor(UnomiStorageProcessor 
unomiStorageProcessor) {
         this.unomiStorageProcessor = unomiStorageProcessor;
     }
 
+    public void setRouteCompletionProcessor(RouteCompletionProcessor 
routeCompletionProcessor) {
+        this.routeCompletionProcessor = routeCompletionProcessor;
+    }
+
     public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
         this.jacksonDataFormat = jacksonDataFormat;
     }

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
new file mode 100644
index 0000000..a53e34b
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
+import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
+
+import java.util.ArrayList;
+
+/**
+ * Created by amidani on 16/06/2017.
+ */
+public class ArrayListAggregationStrategy implements AggregationStrategy {
+
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        Object newBody = newExchange.getIn().getBody();
+        ArrayList<Object> list = null;
+        if (oldExchange == null) {
+            list = new ArrayList<Object>();
+            list.add(newBody);
+            newExchange.getIn().setBody(list);
+            return newExchange;
+        } else {
+            list = oldExchange.getIn().getBody(ArrayList.class);
+            list.add(newBody);
+            return oldExchange;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 53efbf3..86bcf47 100644
--- 
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -33,6 +33,7 @@
             <cm:property name="kafka.import.consumerCount" value="10"/>
             <cm:property name="kafka.import.autoCommit" value="true"/>
             <cm:property name="import.oneshot.uploadDir" 
value="/tmp/oneshot_import_configs/"/>
+            <cm:property name="import.executionsHistory.size" value="5"/>
         </cm:default-properties>
     </cm:property-placeholder>
 
@@ -40,6 +41,11 @@
         <property name="profileImportService" ref="profileImportService"/>
     </bean>
 
+    <bean id="routeCompletionProcessor" 
class="org.apache.unomi.router.core.processor.RouteCompletionProcessor">
+        <property name="importConfigurationService" 
ref="importConfigurationService"/>
+        <property name="executionsHistorySize" 
value="${import.executionsHistory.size}"/>
+    </bean>
+
     <bean id="importConfigByFileNameProcessor" 
class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor">
         <property name="importConfigurationService" 
ref="importConfigurationService"/>
     </bean>
@@ -53,6 +59,11 @@
         <property name="library" value="Jackson"/>
     </bean>
 
+    <bean id="jacksonDataFormatImportLineError" 
class="org.apache.camel.model.dataformat.JsonDataFormat">
+        <property name="unmarshalType" 
value="org.apache.unomi.router.api.ImportLineError"/>
+        <property name="library" value="Jackson"/>
+    </bean>
+
     <bean class="org.apache.camel.component.servlet.osgi.OsgiServletRegisterer"
           init-method="register"
           destroy-method="unregister">
@@ -79,6 +90,7 @@
         </property>
         <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
         <property name="unomiStorageProcessor" ref="unomiStorageProcessor"/>
+        <property name="routeCompletionProcessor" 
ref="routeCompletionProcessor"/>
         <property name="importConfigByFileNameProcessor" 
ref="importConfigByFileNameProcessor"/>
         <property name="importConfigurationService" 
ref="importConfigurationService"/>
         <property name="jacksonDataFormat" ref="jacksonDataFormat"/>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
index 0ce4bb2..e301b58 100644
--- 
a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
+++ 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
@@ -29,4 +29,7 @@ import.config.type=nobroker
 #kafka.import.autoCommit=true
 
 #Import One Shot upload directory
-import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/
\ No newline at end of file
+import.oneshot.uploadDir=${karaf.data}/tmp/unomi_oneshot_import_configs/
+
+#Import executions history size
+import.executionsHistory.size=5
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-rest/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/router/router-rest/pom.xml 
b/extensions/router/router-rest/pom.xml
index fc1065f..e151363 100644
--- a/extensions/router/router-rest/pom.xml
+++ b/extensions/router/router-rest/pom.xml
@@ -68,6 +68,7 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
+            <version>${version.jackson.core}</version>
             <scope>provided</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/6570c8ad/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
 
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
index a4f6131..5283cce 100644
--- 
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
+++ 
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
@@ -95,10 +95,7 @@ public class ImportConfigurationServiceImpl implements 
ImportConfigurationServic
         if (importConfiguration.getItemId() == null) {
             importConfiguration.setItemId(UUID.randomUUID().toString());
         }
-        if(persistenceService.save(importConfiguration)) {
-
-        }
-
+        persistenceService.save(importConfiguration);
         return persistenceService.load(importConfiguration.getItemId(), 
ImportConfiguration.class);
     }
 


Reply via email to