UNOMI-101 : No broker config allowed

Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/8231d564
Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/8231d564
Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/8231d564

Branch: refs/heads/feature-UNOMI-5-KARAF4
Commit: 8231d5640d45f1f9ee8b20a8b239c54a4205ab7c
Parents: f597a69
Author: Abdelkader Midani <amid...@apache.org>
Authored: Wed Jun 14 12:12:20 2017 +0200
Committer: Abdelkader Midani <amid...@apache.org>
Committed: Wed Jun 14 12:12:33 2017 +0200

----------------------------------------------------------------------
 .../unomi/router/core/RouterConstants.java      |  35 ++++++
 .../core/context/ProfileImportCamelContext.java |  17 ++-
 .../BadProfileDataFormatException.java          |  35 ++++++
 .../ImportConfigByFileNameProcessor.java        |   3 +-
 .../core/processor/LineSplitFailureHandler.java |  34 ++++++
 .../core/processor/LineSplitProcessor.java      |  31 +++--
 .../ProfileImportAbstractRouteBuilder.java      |  83 +++++++++++++
 .../ProfileImportFromSourceRouteBuilder.java    | 102 ++++++++++++++++
 .../ProfileImportKafkaToUnomiRouteBuilder.java  |  77 ------------
 .../route/ProfileImportOneShotRouteBuilder.java |  42 +++----
 .../ProfileImportSourceToKafkaRouteBuilder.java | 120 -------------------
 .../route/ProfileImportToUnomiRouteBuilder.java |  70 +++++++++++
 .../resources/OSGI-INF/blueprint/blueprint.xml  |   6 +
 .../main/resources/org.apache.unomi.router.cfg  |  15 ++-
 14 files changed, 422 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
new file mode 100644
index 0000000..f09e993
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core;
+
+/**
+ * Created by amidani on 13/06/2017.
+ */
+public interface RouterConstants {
+
+    String CONFIG_TYPE_NOBROKER = "nobroker";
+    String CONFIG_TYPE_KAFKA = "kafka";
+
+    String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer";
+
+    String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot";
+    String HEADER_CONFIG_TYPE = "configType";
+    String HEADER_PROFILES_COUNT = "profilesCount";
+
+    String DIRECTION_FROM = "from";
+    String DIRECTION_TO = "to";
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
index df734d3..9a4a004 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java
@@ -24,9 +24,9 @@ import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.services.ImportConfigurationService;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
 import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
-import 
org.apache.unomi.router.core.route.ProfileImportKafkaToUnomiRouteBuilder;
+import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder;
 import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder;
-import 
org.apache.unomi.router.core.route.ProfileImportSourceToKafkaRouteBuilder;
+import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleEvent;
@@ -53,6 +53,7 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
     private JacksonDataFormat jacksonDataFormat;
     private String uploadDir;
     private Map<String, String> kafkaProps;
+    private String configType;
 
     private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent";
 
@@ -66,14 +67,14 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
         logger.info("Initialize Camel Context...");
         camelContext = new DefaultCamelContext();
         List<ImportConfiguration> importConfigurationList = 
importConfigurationService.getImportConfigurations();
-        ProfileImportSourceToKafkaRouteBuilder builderReader = new 
ProfileImportSourceToKafkaRouteBuilder(kafkaProps);
+        ProfileImportFromSourceRouteBuilder builderReader = new 
ProfileImportFromSourceRouteBuilder(kafkaProps, configType);
         builderReader.setImportConfigurationList(importConfigurationList);
         builderReader.setJacksonDataFormat(jacksonDataFormat);
         builderReader.setContext(camelContext);
         camelContext.addRoutes(builderReader);
 
         //One shot import route
-        ProfileImportOneShotRouteBuilder builderOneShot = new 
ProfileImportOneShotRouteBuilder(kafkaProps);
+        ProfileImportOneShotRouteBuilder builderOneShot = new 
ProfileImportOneShotRouteBuilder(kafkaProps, configType);
         
builderOneShot.setImportConfigByFileNameProcessor(importConfigByFileNameProcessor);
         builderOneShot.setJacksonDataFormat(jacksonDataFormat);
         builderOneShot.setUploadDir(uploadDir);
@@ -81,7 +82,7 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
         camelContext.addRoutes(builderOneShot);
 
 
-        ProfileImportKafkaToUnomiRouteBuilder builderProcessor = new 
ProfileImportKafkaToUnomiRouteBuilder(kafkaProps);
+        ProfileImportToUnomiRouteBuilder builderProcessor = new 
ProfileImportToUnomiRouteBuilder(kafkaProps, configType);
         builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor);
         builderProcessor.setJacksonDataFormat(jacksonDataFormat);
         builderProcessor.setContext(camelContext);
@@ -113,7 +114,7 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
         }
         //Handle transforming an import config oneshot <--> recurrent
         
if(IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())){
-            ProfileImportSourceToKafkaRouteBuilder builder = new 
ProfileImportSourceToKafkaRouteBuilder(kafkaProps);
+            ProfileImportFromSourceRouteBuilder builder = new 
ProfileImportFromSourceRouteBuilder(kafkaProps, configType);
             
builder.setImportConfigurationList(Arrays.asList(importConfiguration));
             builder.setJacksonDataFormat(jacksonDataFormat);
             builder.setContext(camelContext);
@@ -149,6 +150,10 @@ public class ProfileImportCamelContext implements 
SynchronousBundleListener {
         this.kafkaProps = kafkaProps;
     }
 
+    public void setConfigType(String configType) {
+        this.configType = configType;
+    }
+
     public void preDestroy() throws Exception {
         bundleContext.removeBundleListener(this);
         //This is to shutdown Camel context

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java
new file mode 100644
index 0000000..6c947ff
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.exception;
+
+/**
+ * Created by amidani on 13/06/2017.
+ */
+public class BadProfileDataFormatException extends Exception{
+
+    public BadProfileDataFormatException() {
+        super();
+    }
+
+    public BadProfileDataFormatException(String message) {
+        super(message);
+    }
+
+    public BadProfileDataFormatException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
index 7fc7730..c2968cd 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java
@@ -21,6 +21,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.file.GenericFile;
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.services.ImportConfigurationService;
+import org.apache.unomi.router.core.RouterConstants;
 
 /**
  * Created by amidani on 22/05/2017.
@@ -35,7 +36,7 @@ public class ImportConfigByFileNameProcessor implements 
Processor{
         String fileName = 
exchange.getIn().getBody(GenericFile.class).getFileName();
         String importConfigId = fileName.substring(0, fileName.indexOf('.'));
         ImportConfiguration importConfiguration = 
importConfigurationService.load(importConfigId);
-        exchange.getIn().setHeader("importConfigOneShot", importConfiguration);
+        
exchange.getIn().setHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT, 
importConfiguration);
     }
 
     public void setImportConfigurationService(ImportConfigurationService 
importConfigurationService) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
new file mode 100644
index 0000000..b1de82a
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by amidani on 14/06/2017.
+ */
+public class LineSplitFailureHandler implements Processor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(LineSplitFailureHandler.class.getName());
+
+    public void process(Exchange exchange) throws Exception {
+        logger.error("{}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
index 150ef6d..da14aa5 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java
@@ -22,11 +22,10 @@ import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.ProfileToImport;
+import org.apache.unomi.router.core.RouterConstants;
+import org.apache.unomi.router.core.exception.BadProfileDataFormatException;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 
 /**
  * Created by amidani on 29/12/2016.
@@ -42,7 +41,8 @@ public class LineSplitProcessor implements Processor {
     @Override
     public void process(Exchange exchange) throws Exception {
         //In case of one shot import we check the header and overwrite import 
config
-        ImportConfiguration importConfigOneShot = (ImportConfiguration) 
exchange.getIn().getHeader("importConfigOneShot");
+        ImportConfiguration importConfigOneShot = (ImportConfiguration) 
exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT);
+        String configType = (String) 
exchange.getIn().getHeader(RouterConstants.HEADER_CONFIG_TYPE);
         if(importConfigOneShot!=null) {
             fieldsMapping = (Map<String, 
Integer>)importConfigOneShot.getProperties().get("mapping");
             propertiesToOverwrite = 
importConfigOneShot.getPropertiesToOverwrite();
@@ -50,15 +50,18 @@ public class LineSplitProcessor implements Processor {
             overwriteExistingProfiles = 
importConfigOneShot.isOverwriteExistingProfiles();
             columnSeparator = importConfigOneShot.getColumnSeparator();
         }
-        String[] profileData = 
((String)exchange.getIn().getBody()).split(columnSeparator);
+        String[] profileData = 
((String)exchange.getIn().getBody()).split(columnSeparator, -1);
         ProfileToImport profileToImport = new ProfileToImport();
         profileToImport.setItemId(UUID.randomUUID().toString());
         profileToImport.setItemType("profile");
         profileToImport.setScope("system");
-        if(profileData.length > 0) {
+        if(profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) {
+            if(fieldsMapping.size() != (profileData.length - 1)) {
+                throw new BadProfileDataFormatException("The index does not 
match the number of column : line 
["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new 
Throwable("MAPPING_COLUMN_MATCH"));
+            }
             Map<String, Object> properties = new HashMap<>();
-            for(String fieldMappingKey : fieldsMapping.keySet()) {
-                if(profileData.length > fieldsMapping.get(fieldMappingKey)) {
+            for (String fieldMappingKey : fieldsMapping.keySet()) {
+                if (profileData.length > fieldsMapping.get(fieldMappingKey)) {
                     properties.put(fieldMappingKey, 
profileData[fieldsMapping.get(fieldMappingKey)].trim());
                 }
             }
@@ -66,13 +69,17 @@ public class LineSplitProcessor implements Processor {
             profileToImport.setMergingProperty(mergingProperty);
             profileToImport.setPropertiesToOverwrite(propertiesToOverwrite);
             
profileToImport.setOverwriteExistingProfiles(overwriteExistingProfiles);
-            if(StringUtils.isNotBlank(profileData[profileData.length - 1]) && 
Boolean.parseBoolean(profileData[profileData.length - 1].trim())) {
+            if (StringUtils.isNotBlank(profileData[profileData.length - 1]) && 
Boolean.parseBoolean(profileData[profileData.length - 1].trim())) {
                 profileToImport.setProfileToDelete(true);
             }
+        } else {
+            throw new BadProfileDataFormatException("Empty line : line 
["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new 
Throwable("EMPTY_LINE"));
         }
         exchange.getIn().setBody(profileToImport, ProfileToImport.class);
-        exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
-        exchange.getIn().setHeader(KafkaConstants.KEY, "1");
+        if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
+            exchange.getIn().setHeader(KafkaConstants.KEY, "1");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
new file mode 100644
index 0000000..ca28baf
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.core.RouterConstants;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 13/06/2017.
+ */
+public abstract class ProfileImportAbstractRouteBuilder extends RouteBuilder {
+
+    protected JacksonDataFormat jacksonDataFormat;
+
+    protected String kafkaHost;
+    protected String kafkaPort;
+    protected String kafkaImportTopic;
+    protected String kafkaImportGroupId;
+    protected String kafkaImportConsumerCount;
+    protected String kafkaImportAutoCommit;
+
+    protected String configType;
+
+    public ProfileImportAbstractRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
+        this.kafkaHost = kafkaProps.get("kafkaHost");
+        this.kafkaPort = kafkaProps.get("kafkaPort");
+        this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
+        this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
+        this.kafkaImportConsumerCount = 
kafkaProps.get("kafkaImportConsumerCount");
+        this.kafkaImportAutoCommit = kafkaProps.get("kafkaImportAutoCommit");
+        this.configType = configType;
+    }
+
+    public Object getEndpointURI(String direction) {
+        Object endpoint;
+        if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            //Prepare Kafka Deposit
+            StringBuilder kafkaUri = new StringBuilder("kafka:");
+            
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
+            if (StringUtils.isNotBlank(kafkaImportGroupId)) {
+                kafkaUri.append("&groupId=" + kafkaImportGroupId);
+            }
+            if(RouterConstants.DIRECTION_TO.equals(direction)) {
+                
kafkaUri.append("&autoCommitEnable="+kafkaImportAutoCommit+"&consumersCount="+kafkaImportConsumerCount);
+            }
+            KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
+            kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort);
+            kafkaConfiguration.setTopic(kafkaImportTopic);
+            kafkaConfiguration.setGroupId(kafkaImportGroupId);
+            endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
+            ((KafkaEndpoint)endpoint).setConfiguration(kafkaConfiguration);
+        } else {
+            endpoint = RouterConstants.DIRECT_DEPOSIT_BUFFER;
+        }
+
+        return endpoint;
+    }
+
+    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
+        this.jacksonDataFormat = jacksonDataFormat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
new file mode 100644
index 0000000..9be6fb6
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.core.RouterConstants;
+import org.apache.unomi.router.core.exception.BadProfileDataFormatException;
+import org.apache.unomi.router.core.processor.LineSplitFailureHandler;
+import org.apache.unomi.router.core.processor.LineSplitProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by amidani on 26/04/2017.
+ */
+
+public class ProfileImportFromSourceRouteBuilder extends 
ProfileImportAbstractRouteBuilder {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName());
+
+    private List<ImportConfiguration> importConfigurationList;
+
+
+    public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
+        super(kafkaProps, configType);
+    }
+
+    @Override
+    public void configure() throws Exception {
+
+        logger.info("Configure Recurrent Route 'From Source'");
+
+        //Loop on multiple import configuration
+        for (ImportConfiguration importConfiguration : 
importConfigurationList) {
+            if (importConfiguration.getProperties().size() > 0 &&
+                    StringUtils.isNotEmpty((String) 
importConfiguration.getProperties().get("source"))) {
+                //Prepare Split Processor
+                LineSplitProcessor lineSplitProcessor = new 
LineSplitProcessor();
+                lineSplitProcessor.setFieldsMapping((Map<String, Integer>) 
importConfiguration.getProperties().get("mapping"));
+                
lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles());
+                
lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite());
+                
lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty());
+                
lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator());
+
+                onException(BadProfileDataFormatException.class)
+                        .log(LoggingLevel.ERROR, "Error processing record 
${exchangeProperty.CamelSplitIndex}++ !")
+                        .handled(true)
+                        .process(new LineSplitFailureHandler())
+                        .to("direct:errors");
+
+                errorHandler(deadLetterChannel("direct:errors"));
+
+                ProcessorDefinition prDef = from((String) 
importConfiguration.getProperties().get("source"))
+                        .routeId(importConfiguration.getItemId())// This allow 
identification of the route for manual start/stop
+                        .autoStartup(importConfiguration.isActive())// 
Auto-start if the import configuration is set active
+                        
.split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator()))
+                        .log(LoggingLevel.INFO, "Splitted into 
${exchangeProperty.CamelSplitSize} records")
+                        .setHeader(RouterConstants.HEADER_PROFILES_COUNT, 
exchangeProperty("CamelSplitSize}"))
+                        .setHeader(RouterConstants.HEADER_CONFIG_TYPE, 
constant(configType))
+                        .process(lineSplitProcessor)
+                        .log(LoggingLevel.INFO, "Split IDX 
${exchangeProperty.CamelSplitIndex} record")
+                        .to("log:org.apache.unomi.router?level=INFO")
+                        .marshal(jacksonDataFormat)
+                        .convertBodyTo(String.class);
+
+                if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+                    prDef.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+                } else {
+                    prDef.to((String) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+                }
+
+                
from("direct:errors").to("log:org.apache.unomi.router?level=ERROR");
+            }
+        }
+    }
+
+    public void setImportConfigurationList(List<ImportConfiguration> 
importConfigurationList) {
+        this.importConfigurationList = importConfigurationList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
deleted file mode 100644
index 1b056fe..0000000
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.core.route;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.jackson.JacksonDataFormat;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaEndpoint;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
-
-import java.util.Map;
-
-/**
- * Created by amidani on 26/04/2017.
- */
-public class ProfileImportKafkaToUnomiRouteBuilder extends RouteBuilder {
-
-    private UnomiStorageProcessor unomiStorageProcessor;
-    private JacksonDataFormat jacksonDataFormat;
-    private String kafkaHost;
-    private String kafkaPort;
-    private String kafkaImportTopic;
-    private String kafkaImportGroupId;
-
-    public ProfileImportKafkaToUnomiRouteBuilder(Map<String, String> 
kafkaProps) {
-        kafkaHost = kafkaProps.get("kafkaHost");
-        kafkaPort = kafkaProps.get("kafkaPort");
-        kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
-        kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
-    }
-
-    @Override
-    public void configure() throws Exception {
-
-        StringBuilder kafkaUri = new StringBuilder("kafka:");
-        
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
-        if(StringUtils.isNotBlank(kafkaImportGroupId)) {
-            kafkaUri.append("&groupId="+kafkaImportGroupId);
-        }
-        kafkaUri.append("&autoCommitEnable=true&consumersCount=10");
-        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
-        kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort);
-        kafkaConfiguration.setTopic(kafkaImportTopic);
-        kafkaConfiguration.setGroupId(kafkaImportGroupId);
-        KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
-        endpoint.setConfiguration(kafkaConfiguration);
-        from(endpoint)
-                .unmarshal(jacksonDataFormat)
-                .process(unomiStorageProcessor)
-                .to("log:org.apache.unomi.router?level=INFO");
-
-    }
-
-    public void setUnomiStorageProcessor(UnomiStorageProcessor 
unomiStorageProcessor) {
-        this.unomiStorageProcessor = unomiStorageProcessor;
-    }
-
-    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
-        this.jacksonDataFormat = jacksonDataFormat;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
index d095f3e..84b220c 100644
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
@@ -21,7 +21,9 @@ import org.apache.camel.component.jackson.JacksonDataFormat;
 import org.apache.camel.component.kafka.KafkaComponent;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.ProcessorDefinition;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.core.RouterConstants;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
 import org.apache.unomi.router.core.processor.LineSplitProcessor;
 import org.slf4j.Logger;
@@ -32,57 +34,41 @@ import java.util.Map;
 /**
  * Created by amidani on 22/05/2017.
  */
-public class ProfileImportOneShotRouteBuilder extends RouteBuilder {
+public class ProfileImportOneShotRouteBuilder extends 
ProfileImportAbstractRouteBuilder {
 
     private Logger logger = 
LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName());
 
     private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
-    private JacksonDataFormat jacksonDataFormat;
     private String uploadDir;
-    private String kafkaHost;
-    private String kafkaPort;
-    private String kafkaImportTopic;
-    private String kafkaImportGroupId;
 
     private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
 
-    public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps) {
-        kafkaHost = kafkaProps.get("kafkaHost");
-        kafkaPort = kafkaProps.get("kafkaPort");
-        kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
-        kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
+    public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
+        super(kafkaProps, configType);
     }
 
     @Override
     public void configure() throws Exception {
 
-        //Prepare Kafka Deposit
-        StringBuilder kafkaUri = new StringBuilder("kafka:");
-        
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
-        if(StringUtils.isNotBlank(kafkaImportGroupId)) {
-            kafkaUri.append("&groupId="+ kafkaImportGroupId);
-        }
-
-        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
-        kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort);
-        kafkaConfiguration.setTopic(kafkaImportTopic);
-        kafkaConfiguration.setGroupId(kafkaImportGroupId);
-        KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
-        endpoint.setConfiguration(kafkaConfiguration);
+        logger.info("Configure OneShot Route...");
 
         LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
 
-
-        from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m")
+        ProcessorDefinition prDef = 
from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m")
                 .routeId(IMPORT_ONESHOT_ROUTE_ID)
                 .autoStartup(true)
                 .process(importConfigByFileNameProcessor)
                 
.split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}"))
+                .setHeader("configType", constant(configType))
                 .process(lineSplitProcessor)
                 .to("log:org.apache.unomi.router?level=INFO")
                 .marshal(jacksonDataFormat)
-                .convertBodyTo(String.class)
-                .to(endpoint);
+                .convertBodyTo(String.class);
+        if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){
+            prDef.to((KafkaEndpoint) 
getEndpointURI(RouterConstants.DIRECTION_FROM));
+        } else {
+            prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+        }
     }
 
     public void 
setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor 
importConfigByFileNameProcessor) {

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
deleted file mode 100644
index 37ae59e..0000000
--- 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.core.route;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.jackson.JacksonDataFormat;
-import org.apache.camel.component.kafka.KafkaComponent;
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaEndpoint;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.router.api.ImportConfiguration;
-import org.apache.unomi.router.core.processor.LineSplitProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created by amidani on 26/04/2017.
- */
-
-public class ProfileImportSourceToKafkaRouteBuilder extends RouteBuilder {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ProfileImportSourceToKafkaRouteBuilder.class.getName());
-
-    private List<ImportConfiguration> importConfigurationList;
-    private JacksonDataFormat jacksonDataFormat;
-    private String kafkaHost;
-    private String kafkaPort;
-    private String kafkaImportTopic;
-    private String kafkaImportGroupId;
-
-    public ProfileImportSourceToKafkaRouteBuilder(Map<String, String> 
kafkaProps) {
-        kafkaHost = kafkaProps.get("kafkaHost");
-        kafkaPort = kafkaProps.get("kafkaPort");
-        kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
-        kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
-    }
-
-    @Override
-    public void configure() throws Exception {
-        //Prepare Kafka Deposit
-        StringBuilder kafkaUri = new StringBuilder("kafka:");
-        
kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
-        if(StringUtils.isNotBlank(kafkaImportGroupId)) {
-            kafkaUri.append("&groupId="+ kafkaImportGroupId);
-        }
-
-        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
-        kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort);
-        kafkaConfiguration.setTopic(kafkaImportTopic);
-        kafkaConfiguration.setGroupId(kafkaImportGroupId);
-        KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new 
KafkaComponent(this.getContext()));
-        endpoint.setConfiguration(kafkaConfiguration);
-
-        //Loop on multiple import configuration
-        for(ImportConfiguration importConfiguration : importConfigurationList) 
{
-            if(importConfiguration.getProperties().size() > 0 &&
-                    StringUtils.isNotEmpty((String) 
importConfiguration.getProperties().get("source"))) {
-                //Prepare Split Processor
-                LineSplitProcessor lineSplitProcessor = new 
LineSplitProcessor();
-                lineSplitProcessor.setFieldsMapping((Map<String, Integer>) 
importConfiguration.getProperties().get("mapping"));
-                
lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles());
-                
lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite());
-                
lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty());
-                
lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator());
-
-                from((String) 
importConfiguration.getProperties().get("source"))
-                        .routeId(importConfiguration.getItemId())// This allow 
identification of the route for manual start/stop
-                        .autoStartup(importConfiguration.isActive())// 
Auto-start if the import configuration is set active
-                        
.split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator()))
-                        .process(lineSplitProcessor)
-                        .to("log:org.apache.unomi.router?level=INFO")
-                        .marshal(jacksonDataFormat)
-                        .convertBodyTo(String.class)
-                        .to(endpoint);
-            }
-        }
-    }
-
-    public void setImportConfigurationList(List<ImportConfiguration> 
importConfigurationList) {
-        this.importConfigurationList = importConfigurationList;
-    }
-
-    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
-        this.jacksonDataFormat = jacksonDataFormat;
-    }
-
-    public void setKafkaHost(String kafkaHost) {
-        this.kafkaHost = kafkaHost;
-    }
-
-    public void setKafkaPort(String kafkaPort) {
-        this.kafkaPort = kafkaPort;
-    }
-
-    public void setKafkaImportTopic(String kafkaImportTopic) {
-        this.kafkaImportTopic = kafkaImportTopic;
-    }
-
-    public void setKafkaImportGroupId(String kafkaImportGroupId) {
-        this.kafkaImportGroupId = kafkaImportGroupId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
new file mode 100644
index 0000000..9bec24b
--- /dev/null
+++ 
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.unomi.router.core.RouterConstants;
+import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 26/04/2017.
+ */
+public class ProfileImportToUnomiRouteBuilder extends 
ProfileImportAbstractRouteBuilder {
+
+    private Logger logger = 
LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName());
+
+    private UnomiStorageProcessor unomiStorageProcessor;
+
+    public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, 
String configType) {
+        super(kafkaProps, configType);
+    }
+
+    @Override
+    public void configure() throws Exception {
+
+        logger.info("Configure Recurrent Route 'To Target'");
+
+        RouteDefinition rtDef;
+        if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){
+            
rtDef=from((KafkaEndpoint)getEndpointURI(RouterConstants.DIRECTION_TO));
+        } else {
+            rtDef=from((String)getEndpointURI(RouterConstants.DIRECTION_TO));
+        }
+        rtDef.unmarshal(jacksonDataFormat)
+                .process(unomiStorageProcessor)
+                .to("log:org.apache.unomi.router?level=INFO");
+
+    }
+
+    public void setUnomiStorageProcessor(UnomiStorageProcessor 
unomiStorageProcessor) {
+        this.unomiStorageProcessor = unomiStorageProcessor;
+    }
+
+    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
+        this.jacksonDataFormat = jacksonDataFormat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 4c36b9e..53efbf3 100644
--- 
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ 
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -25,10 +25,13 @@
 
     <cm:property-placeholder persistent-id="org.apache.unomi.router" 
update-strategy="reload">
         <cm:default-properties>
+            <cm:property name="import.config.type" value="nobroker"/>
             <cm:property name="kafka.host" value="localhost"/>
             <cm:property name="kafka.port" value="9092"/>
             <cm:property name="kafka.import.topic" value="camel-deposit"/>
             <cm:property name="kafka.import.groupId" 
value="unomi-import-group"/>
+            <cm:property name="kafka.import.consumerCount" value="10"/>
+            <cm:property name="kafka.import.autoCommit" value="true"/>
             <cm:property name="import.oneshot.uploadDir" 
value="/tmp/oneshot_import_configs/"/>
         </cm:default-properties>
     </cm:property-placeholder>
@@ -63,12 +66,15 @@
 
     <bean id="camelContext" 
class="org.apache.unomi.router.core.context.ProfileImportCamelContext"
           init-method="initCamelContext" destroy-method="preDestroy">
+        <property name="configType" value="${import.config.type}"/>
         <property name="kafkaProps">
             <map>
                 <entry key="kafkaHost" value="${kafka.host}"/>
                 <entry key="kafkaPort" value="${kafka.port}"/>
                 <entry key="kafkaImportTopic" value="${kafka.import.topic}"/>
                 <entry key="kafkaImportGroupId" 
value="${kafka.import.groupId}"/>
+                <entry key="kafkaImportConsumerCount" 
value="${kafka.import.consumerCount}"/>
+                <entry key="kafkaImportAutoCommit" 
value="${kafka.import.autoCommit}"/>
             </map>
         </property>
         <property name="uploadDir" value="${import.oneshot.uploadDir}"/>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
----------------------------------------------------------------------
diff --git 
a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
index ff2c8ef..0ce4bb2 100644
--- 
a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
+++ 
b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
@@ -15,11 +15,18 @@
 # limitations under the License.
 #
 
+#Configuration Type values {'nobroker', 'kafka'}
+import.config.type=nobroker
+
+#Uncomment and update Kafka settings to use Kafka as a broker
+
 #Kafka
- settingskafka.host=localhost
-kafka.port=9092
-kafka.import.topic=camel-deposit
-kafka.import.groupId=unomi-import-group
+#kafka.host=localhost
+#kafka.port=9092
+#kafka.import.topic=camel-deposit
+#kafka.import.groupId=unomi-import-group
+#kafka.import.consumerCount=10
+#kafka.import.autoCommit=true
 
 #Import One Shot upload directory
 import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/
\ No newline at end of file

Reply via email to