UNOMI-101 : New feature to add the ability to import profiles
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/89d4c8eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/89d4c8eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/89d4c8eb Branch: refs/heads/feature-UNOMI-5-KARAF4 Commit: 89d4c8eb9646705cfd6c4bdb79fdac27ee9a5634 Parents: 0f44140 Author: Abdelkader Midani <amid...@apache.org> Authored: Tue May 23 02:30:28 2017 +0200 Committer: Abdelkader Midani <amid...@apache.org> Committed: Mon Jun 12 20:09:59 2017 +0200 ---------------------------------------------------------------------- extensions/pom.xml | 1 + extensions/router/README.md | 113 ++++++++++ extensions/router/pom.xml | 64 ++++++ extensions/router/router-api/pom.xml | 43 ++++ .../unomi/router/api/ImportConfiguration.java | 223 +++++++++++++++++++ .../unomi/router/api/ProfileToImport.java | 77 +++++++ .../services/ImportConfigurationService.java | 60 +++++ .../api/services/ProfileImportService.java | 29 +++ extensions/router/router-core/pom.xml | 182 +++++++++++++++ .../core/context/ProfileImportCamelContext.java | 170 ++++++++++++++ .../core/processor/ConfigUpdateProcessor.java | 44 ++++ .../ImportConfigByFileNameProcessor.java | 44 ++++ .../core/processor/LineSplitProcessor.java | 114 ++++++++++ .../core/processor/UnomiStorageProcessor.java | 46 ++++ .../ProfileImportConfigUpdateRouteBuilder.java | 62 ++++++ .../ProfileImportKafkaToUnomiRouteBuilder.java | 77 +++++++ .../route/ProfileImportOneShotRouteBuilder.java | 99 ++++++++ .../ProfileImportSourceToKafkaRouteBuilder.java | 120 ++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 94 ++++++++ .../main/resources/org.apache.unomi.router.cfg | 25 +++ extensions/router/router-karaf-feature/pom.xml | 157 +++++++++++++ extensions/router/router-rest/pom.xml | 75 +++++++ .../ImportConfigurationServiceEndPoint.java | 176 +++++++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 80 +++++++ extensions/router/router-service/pom.xml | 104 +++++++++ .../ImportConfigurationServiceImpl.java | 114 ++++++++++ .../services/ProfileImportServiceImpl.java | 121 ++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 39 ++++ itests/pom.xml | 8 +- package/pom.xml | 11 + performance-tests/pom.xml | 4 +- ...g.apache.unomi.persistence.elasticsearch.cfg | 2 +- persistence-elasticsearch/plugins/pom.xml | 2 +- samples/trainingplugin.zip | Bin 0 -> 48508 bytes samples/trainingplugin/pom.xml | 55 +++++ .../training/TrainedNotificationAction.java | 61 +++++ .../cxs/actions/trainingNotifAction.json | 13 ++ .../META-INF/cxs/rules/trainedNotification.json | 20 ++ .../resources/OSGI-INF/blueprint/blueprint.xml | 34 +++ 39 files changed, 2755 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/pom.xml b/extensions/pom.xml index 7126b61..dea4bc2 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -34,6 +34,7 @@ <module>lists-extension</module> <module>privacy-extension</module> <module>geonames</module> + <module>router</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/README.md ---------------------------------------------------------------------- diff --git a/extensions/router/README.md b/extensions/router/README.md new file mode 100644 index 0000000..0d382d4 --- /dev/null +++ b/extensions/router/README.md @@ -0,0 +1,113 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +Unomi Router +========================== + +## Getting started +Unomi Router Extension a Karaf Feature that provide an Enterprise Application Integration tool. +It is optional so you must configure it and install it in Karaf, and can be used for Machine - Machine or Human - Machine integration with Unomi. +Mainly Unomi Router Extension aim to make it easy to import third party applications/platforms profiles into Unomi. +This extension is implemented using Apache Camel routes and is using Apache Kafka to buffer import process and make it failsafe. + +## Getting started +1. Configure your Unomi Router: + In the `etc/org.apache.unomi.router.cfg` file, you might want to update the following settings: + Kafka settings + >`#Kafka settings` + + >`kafka.host=localhost` + + >`kafka.port=9092` + + >`kafka.import.topic=camel-deposit` + + >`kafka.import.groupId=unomi-import-group` + + Kafka host and port with the topic name and the groupId ti which the topic is assigned + + >`#Import One Shot upload directory` + + >`import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/` + + Path to the folder where unomi should stock file imported for a oneshot processing + + +2. Deploy into Apache Unomi using the following commands from the Apache Karaf shell: + ```sh + $ feature:repo-add mvn:org.apache.unomi/unomi-router-karaf-feature/${version}/xml/features + $ feature:install unomi-router-karaf-feature + ``` + +3. Send your import configuration: + + An import configuration is nothing else than a simple JSON to describe how you want to import your data (Profiles). + To create/update an import configuration + + `POST /cxs/importConfiguration` + ```json + { + "itemId": "f57f1f86-97bf-4ba0-b4e4-7d5e77e7c0bd", + "itemType": "importConfig", + "scope": "integration", + "name": "Test Recurrent", + "description": "Just test recurrent import", + "configType": "recurrent", + "properties": { + "source": "{file/ftp}://{path}?fileName={file-name}.csv&move=.done&consumer.delay=20000", + "mapping": { + "firstName": 0, + "lastName": 1, + ... + } + }, + "mergingProperty": "email", + "overwriteExistingProfiles": true, + "propertiesToOverwrite": ["firstName", "lastName"], + "active": true + } + ``` + + Omit the `itemId` when creating new entry, `configType` can be '**recurrent**' for file/ftp/network path polling or '**oneshot**' for one time import. + + The `properties.source` attribute is an Apache Camel endpoint uri (See http://camel.apache.org/uris.html for more details). Unomi Router is designed to use **File** and **FTP** Camel components. + + The attribute `properties.mapping` is a Map of: + * Key: Profile property id in Unomi + * Value: Index of the column in the imported file to copy the in the previous property. + + The attribute `mergingProperty` is the profile property id in Unomi to use to check for duplication. + + The attribute `propertiesToOverwrite` is a list of profile properties ids to overwrite, if **null** all properties + will be overwritten. + + The attribute `active` is the flag to activate or deactivate the import configuration. + + Concerning oneshot import configuration using the previously described service will only create the import configuration, to send the file to process + you need to call : + + `POST /cxs/importConfiguration/oneshot` + + `Content-Type : multipart/form-data` + + First multipart with the name '**importConfigId**' is the importConfiguration to use to import the file, second one with the name '**file**' is the file to import. + + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/router/pom.xml b/extensions/router/pom.xml new file mode 100644 index 0000000..c69320c --- /dev/null +++ b/extensions/router/pom.xml @@ -0,0 +1,64 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.unomi</groupId> + <artifactId>unomi-extensions</artifactId> + <version>1.2.0-incubating-dmf_1343-SNAPSHOT</version> + </parent> + + <artifactId>unomi-router</artifactId> + <name>Apache Unomi :: Extensions :: Router</name> + <description>Apache Camel Router for the Apache Unomi Context server</description> + <packaging>pom</packaging> + + <properties> + <camel.version>2.18.3</camel.version> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency> + <Import-Package> + sun.misc;resolution:=optional, + * + </Import-Package> + </instructions> + </configuration> + </plugin> + </plugins> + </build> + + <modules> + <module>router-api</module> + <module>router-service</module> + <module>router-core</module> + <module>router-rest</module> + <module>router-karaf-feature</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/pom.xml b/extensions/router/router-api/pom.xml new file mode 100644 index 0000000..06207b3 --- /dev/null +++ b/extensions/router/router-api/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>unomi-router</artifactId> + <groupId>org.apache.unomi</groupId> + <version>1.2.0-incubating-dmf_1343-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>unomi-router-api</artifactId> + <name>Apache Unomi :: Extensions :: Router :: API</name> + <description>Router Specification API</description> + <packaging>bundle</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.unomi</groupId> + <artifactId>unomi-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java new file mode 100644 index 0000000..127de39 --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.api; + +import org.apache.unomi.api.Item; +import org.apache.unomi.api.MetadataItem; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Created by amidani on 28/04/2017. + */ +public class ImportConfiguration extends Item { + + /** + * The ImportConfiguration ITEM_TYPE + * + * @see Item for a discussion of ITEM_TYPE + */ + public static final String ITEM_TYPE = "importConfig"; + private String name; + private String description; + private String configType; + private Map<String, Object> properties = new HashMap<>(); + private String mergingProperty; + private boolean overwriteExistingProfiles = false; + private List<String> propertiesToOverwrite; + + private String columnSeparator = ","; + private String lineSeparator = "\n"; + private boolean active = false; + + /** + * Sets the property identified by the specified name to the specified value. If a property with that name already exists, replaces its value, otherwise adds the new + * property with the specified name and value. + * + * @param name the name of the property to set + * @param value the value of the property + */ + public void setProperty(String name, Object value) { + properties.put(name, value); + } + + /** + * Retrieves the name of the import configuration + * @return the name of the import configuration + */ + public String getName() { return this.name; } + + /** + * Sets the name of the import configuration + * @param name the name of the import configuration + */ + public void setName(String name) { + this.name = name; + } + + /** + * Retrieves the description of the import configuration + * @return the description of the import configuration + */ + public String getDescription() { return this.description; } + + /** + * Sets the description of the import configuration + * @param description the description of the import configuration + */ + public void setDescription(String description) { + this.description = description; + } + + + /** + * Retrieves the config type of the import configuration + * @return the config type of the import configuration + */ + public String getConfigType() { return this.configType; } + + /** + * Sets the config type of the import configuration + * @param configType the config type of the import configuration + */ + public void setConfigType(String configType) { + this.configType = configType; + } + + /** + * Retrieves the property identified by the specified name. + * + * @param name the name of the property to retrieve + * @return the value of the specified property or {@code null} if no such property exists + */ + public Object getProperty(String name) { + return properties.get(name); + } + + /** + * Retrieves a Map of all property name - value pairs for this import configuration. + * + * @return a Map of all property name - value pairs for this import configuration + */ + public Map<String, Object> getProperties() { + return properties; + } + + /** + * Sets the property name - value pairs for this import configuration. + * + * @param properties a Map containing the property name - value pairs for this import configuration + */ + public void setProperties(Map<String, Object> properties) { + this.properties = properties; + } + + public String getMergingProperty() { + return mergingProperty; + } + + /** + * Sets the merging property. + * @param mergingProperty property used to check if the profile exist when merging + */ + public void setMergingProperty(String mergingProperty) { + this.mergingProperty = mergingProperty; + } + + + /** + * Retrieves the import configuration active flag. + * + * @return true if the import configuration is active false if not + */ + public boolean isActive() { + return this.active; + } + + /** + * Sets the active flag true/false. + * + * @param active a boolean to set to active or inactive the import configuration + */ + public void setActive(boolean active) { + this.active = active; + } + + /** + * Retrieves the import configuration overwriteExistingProfiles flag. + * + * @return true if during the import existing profiles must be overwritten + */ + public boolean isOverwriteExistingProfiles() { + return this.overwriteExistingProfiles; + } + + /** + * Sets the overwriteExistingProfiles flag true/false. + * + * @param overwriteExistingProfiles a boolean to set overwriteExistingProfiles in the import configuration + */ + public void setOverwriteExistingProfiles(boolean overwriteExistingProfiles) { + this.overwriteExistingProfiles = overwriteExistingProfiles; + } + + public List<String> getPropertiesToOverwrite() { + return propertiesToOverwrite; + } + + public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) { + this.propertiesToOverwrite = propertiesToOverwrite; + } + + /** + * gets the column separator. + */ + public String getColumnSeparator() { + return this.columnSeparator; + } + + /** + * Sets the column separator. + * @param columnSeparator property used to specify a line separator. Defaults to ',' + */ + public void setColumnSeparator(String columnSeparator) { + if(this.columnSeparator !=null) { + this.columnSeparator = columnSeparator; + } + } + + /** + * gets the line separator. + */ + public String getLineSeparator() { + return this.lineSeparator; + } + + /** + * Sets the line separator. + * @param lineSeparator property used to specify a line separator. Defaults to '\n' + */ + public void setLineSeparator(String lineSeparator) { + if(lineSeparator != null) { + this.lineSeparator = lineSeparator; + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java new file mode 100644 index 0000000..30e40e0 --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ProfileToImport.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.router.api; + +import org.apache.unomi.api.Profile; + +import java.util.List; + +/** + * An extension of {@link Profile} to handle merge strategy and deletion when importing profiles + */ +public class ProfileToImport extends Profile { + + private List<String> propertiesToOverwrite; + private String mergingProperty; + private boolean profileToDelete; + private boolean overwriteExistingProfiles; + + + public List<String> getPropertiesToOverwrite() { + return this.propertiesToOverwrite; + } + + public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) { + this.propertiesToOverwrite = propertiesToOverwrite; + } + + public boolean isProfileToDelete() { + return this.profileToDelete; + } + + public void setProfileToDelete(boolean profileToDelete) { + this.profileToDelete = profileToDelete; + } + + public boolean isOverwriteExistingProfiles() { + return this.overwriteExistingProfiles; + } + + /** + * Sets the overwriteExistingProfiles flag. + * @param overwriteExistingProfiles flag used to specify if we want to overwrite existing profiles + */ + public void setOverwriteExistingProfiles(boolean overwriteExistingProfiles) { + this.overwriteExistingProfiles = overwriteExistingProfiles; + } + + public String getMergingProperty() { + return this.mergingProperty; + } + + /** + * Sets the merging property. + * @param mergingProperty property used to check if the profile exist when merging + */ + public void setMergingProperty(String mergingProperty) { + this.mergingProperty = mergingProperty; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java new file mode 100644 index 0000000..cacd671 --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.api.services; + +import org.apache.unomi.router.api.ImportConfiguration; + +import java.util.List; + +/** + * A service to access and operate on {@link ImportConfiguration}s. + */ +public interface ImportConfigurationService { + + /** + * Retrieves all the import configurations. + * + * @return the list of import configurations + */ + public List<ImportConfiguration> getImportConfigurations(); + + /** + * Retrieves the import configuration identified by the specified identifier. + * + * @param configId the identifier of the profile to retrieve + * @return the import configuration identified by the specified identifier or + * {@code null} if no such import configuration exists + */ + public ImportConfiguration load(String configId); + + /** + * Saves the specified import configuration in the context server. + * + * @param profile the import configuration to be saved + * @return the newly saved import configuration + */ + public ImportConfiguration save(ImportConfiguration profile); + + /** + * Deletes the import configuration identified by the specified identifier. + * + * @param configId the identifier of the import configuration to delete + */ + public void delete(String configId); + + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java new file mode 100644 index 0000000..aa7d182 --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ProfileImportService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.api.services; + +import org.apache.unomi.router.api.ProfileToImport; + +import java.lang.reflect.InvocationTargetException; + +/** + * Created by amidani on 20/05/2017. + */ +public interface ProfileImportService { + + boolean saveMergeDeleteImportedProfile(ProfileToImport profileToImport) throws InvocationTargetException, IllegalAccessException; +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/pom.xml b/extensions/router/router-core/pom.xml new file mode 100644 index 0000000..53780e7 --- /dev/null +++ b/extensions/router/router-core/pom.xml @@ -0,0 +1,182 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>unomi-router</artifactId> + <groupId>org.apache.unomi</groupId> + <version>1.2.0-incubating-dmf_1343-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>unomi-router-core</artifactId> + <name>Apache Unomi :: Extensions :: Router :: Core</name> + <description>Router Core (Apache Camel Routes)</description> + <packaging>bundle</packaging> + + <dependencies> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.unomi</groupId> + <artifactId>unomi-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.unomi</groupId> + <artifactId>unomi-services</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.unomi</groupId> + <artifactId>unomi-router-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + <version>${camel.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jackson</artifactId> + <version>${camel.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-http-common</artifactId> + <version>${camel.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-servlet</artifactId> + <version>${camel.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-kafka</artifactId> + <version>${camel.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + <version>3.5</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.servicemix.bundles</groupId> + <artifactId>org.apache.servicemix.bundles.jsch</artifactId> + <version>0.1.54_1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <extensions>true</extensions> + <configuration> + <instructions> + <Embed-Dependency>*;scope=compile|runtime</Embed-Dependency> + <Import-Package> + org.osgi.service.event;resolution:=optional, + org.apache.camel, + org.apache.camel.builder, + org.apache.camel.component.file.remote, + org.apache.camel.component.file, + org.apache.camel.component.jackson, + org.apache.camel.component.kafka, + org.apache.camel.component.servlet, + org.apache.camel.component.servlet.osgi, + org.apache.camel.impl, + org.apache.camel.model, + org.apache.camel.model.dataformat, + org.apache.camel.model.rest, + org.apache.camel.spi, + org.apache.unomi.api, + org.apache.unomi.router.api, + org.apache.unomi.api.services, + org.apache.unomi.router.api.services, + org.apache.kafka.clients.producer;resolution:=optional, + org.apache.kafka.clients.consumer;resolution:=optional, + com.jcraft.jsch, + org.osgi.framework, + org.osgi.service.http, + org.slf4j + </Import-Package> + </instructions> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file> + src/main/resources/org.apache.unomi.router.cfg + </file> + <type>cfg</type> + <classifier>routercfg</classifier> + </artifact> + </artifacts> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java new file mode 100644 index 0000000..df734d3 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.context; + +import org.apache.camel.CamelContext; +import org.apache.camel.Route; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.api.services.ImportConfigurationService; +import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; +import org.apache.unomi.router.core.processor.UnomiStorageProcessor; +import org.apache.unomi.router.core.route.ProfileImportKafkaToUnomiRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportSourceToKafkaRouteBuilder; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleEvent; +import org.osgi.framework.SynchronousBundleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Created by amidani on 04/05/2017. + */ +public class ProfileImportCamelContext implements SynchronousBundleListener { + + private Logger logger = LoggerFactory.getLogger(ProfileImportCamelContext.class.getName()); + + private CamelContext camelContext; + private UnomiStorageProcessor unomiStorageProcessor; + private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; + private ImportConfigurationService importConfigurationService; + private JacksonDataFormat jacksonDataFormat; + private String uploadDir; + private Map<String, String> kafkaProps; + + private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent"; + + private BundleContext bundleContext; + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public void initCamelContext() throws Exception { + logger.info("Initialize Camel Context..."); + camelContext = new DefaultCamelContext(); + List<ImportConfiguration> importConfigurationList = importConfigurationService.getImportConfigurations(); + ProfileImportSourceToKafkaRouteBuilder builderReader = new ProfileImportSourceToKafkaRouteBuilder(kafkaProps); + builderReader.setImportConfigurationList(importConfigurationList); + builderReader.setJacksonDataFormat(jacksonDataFormat); + builderReader.setContext(camelContext); + camelContext.addRoutes(builderReader); + + //One shot import route + ProfileImportOneShotRouteBuilder builderOneShot = new ProfileImportOneShotRouteBuilder(kafkaProps); + builderOneShot.setImportConfigByFileNameProcessor(importConfigByFileNameProcessor); + builderOneShot.setJacksonDataFormat(jacksonDataFormat); + builderOneShot.setUploadDir(uploadDir); + builderOneShot.setContext(camelContext); + camelContext.addRoutes(builderOneShot); + + + ProfileImportKafkaToUnomiRouteBuilder builderProcessor = new ProfileImportKafkaToUnomiRouteBuilder(kafkaProps); + builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor); + builderProcessor.setJacksonDataFormat(jacksonDataFormat); + builderProcessor.setContext(camelContext); + camelContext.addRoutes(builderProcessor); + + camelContext.start(); + + logger.debug("postConstruct {" + bundleContext.getBundle() + "}"); + + processBundleStartup(bundleContext); + for (Bundle bundle : bundleContext.getBundles()) { + if (bundle.getBundleContext() != null) { + processBundleStartup(bundle.getBundleContext()); + } + } + bundleContext.addBundleListener(this); + logger.info("Camel Context {} initialized successfully."); + + } + + private boolean stopRoute(String routeId) throws Exception { + return camelContext.stopRoute(routeId, 10L, TimeUnit.SECONDS, true); + } + + public void updateProfileImportReaderRoute(ImportConfiguration importConfiguration) throws Exception { + Route route = camelContext.getRoute(importConfiguration.getItemId()); + if(route!=null && stopRoute(importConfiguration.getItemId())) { + camelContext.removeRoute(importConfiguration.getItemId()); + } + //Handle transforming an import config oneshot <--> recurrent + if(IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())){ + ProfileImportSourceToKafkaRouteBuilder builder = new ProfileImportSourceToKafkaRouteBuilder(kafkaProps); + builder.setImportConfigurationList(Arrays.asList(importConfiguration)); + builder.setJacksonDataFormat(jacksonDataFormat); + builder.setContext(camelContext); + camelContext.addRoutes(builder); + } + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { + this.unomiStorageProcessor = unomiStorageProcessor; + } + + public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { + this.importConfigByFileNameProcessor = importConfigByFileNameProcessor; + } + + public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { + this.importConfigurationService = importConfigurationService; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } + + public void setUploadDir(String uploadDir) { + this.uploadDir = uploadDir; + } + + public void setKafkaProps(Map<String, String> kafkaProps) { + this.kafkaProps = kafkaProps; + } + + public void preDestroy() throws Exception { + bundleContext.removeBundleListener(this); + //This is to shutdown Camel context + //(will stop all routes/components/endpoints etc and clear internal state/cache) + this.camelContext.stop(); + logger.info("Camel context for profile import is shutdown."); + } + + private void processBundleStartup(BundleContext bundleContext) { + if (bundleContext == null) { + return; + } + } + + @Override + public void bundleChanged(BundleEvent bundleEvent) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java new file mode 100644 index 0000000..e4eaa19 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.core.context.ProfileImportCamelContext; + +/** + * Created by amidani on 10/05/2017. + */ +public class ConfigUpdateProcessor implements Processor{ + + private ProfileImportCamelContext profileImportCamelContext; + + @Override + public void process(Exchange exchange) throws Exception { + if (exchange.getIn() != null) { + Message message = exchange.getIn(); + ImportConfiguration importConfiguration = message.getBody(ImportConfiguration.class); + profileImportCamelContext.updateProfileImportReaderRoute(importConfiguration); + } + } + + public void setProfileImportCamelContext(ProfileImportCamelContext profileImportCamelContext) { + this.profileImportCamelContext = profileImportCamelContext; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java new file mode 100644 index 0000000..7fc7730 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.file.GenericFile; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.api.services.ImportConfigurationService; + +/** + * Created by amidani on 22/05/2017. + */ +public class ImportConfigByFileNameProcessor implements Processor{ + + private ImportConfigurationService importConfigurationService; + + @Override + public void process(Exchange exchange) throws Exception { + + String fileName = exchange.getIn().getBody(GenericFile.class).getFileName(); + String importConfigId = fileName.substring(0, fileName.indexOf('.')); + ImportConfiguration importConfiguration = importConfigurationService.load(importConfigId); + exchange.getIn().setHeader("importConfigOneShot", importConfiguration); + } + + public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { + this.importConfigurationService = importConfigurationService; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java new file mode 100644 index 0000000..150ef6d --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.api.ProfileToImport; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Created by amidani on 29/12/2016. + */ +public class LineSplitProcessor implements Processor { + + private Map<String, Integer> fieldsMapping; + private List<String> propertiesToOverwrite; + private String mergingProperty; + private boolean overwriteExistingProfiles; + private String columnSeparator; + + @Override + public void process(Exchange exchange) throws Exception { + //In case of one shot import we check the header and overwrite import config + ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader("importConfigOneShot"); + if(importConfigOneShot!=null) { + fieldsMapping = (Map<String, Integer>)importConfigOneShot.getProperties().get("mapping"); + propertiesToOverwrite = importConfigOneShot.getPropertiesToOverwrite(); + mergingProperty = importConfigOneShot.getMergingProperty(); + overwriteExistingProfiles = importConfigOneShot.isOverwriteExistingProfiles(); + columnSeparator = importConfigOneShot.getColumnSeparator(); + } + String[] profileData = ((String)exchange.getIn().getBody()).split(columnSeparator); + ProfileToImport profileToImport = new ProfileToImport(); + profileToImport.setItemId(UUID.randomUUID().toString()); + profileToImport.setItemType("profile"); + profileToImport.setScope("system"); + if(profileData.length > 0) { + Map<String, Object> properties = new HashMap<>(); + for(String fieldMappingKey : fieldsMapping.keySet()) { + if(profileData.length > fieldsMapping.get(fieldMappingKey)) { + properties.put(fieldMappingKey, profileData[fieldsMapping.get(fieldMappingKey)].trim()); + } + } + profileToImport.setProperties(properties); + profileToImport.setMergingProperty(mergingProperty); + profileToImport.setPropertiesToOverwrite(propertiesToOverwrite); + profileToImport.setOverwriteExistingProfiles(overwriteExistingProfiles); + if(StringUtils.isNotBlank(profileData[profileData.length - 1]) && Boolean.parseBoolean(profileData[profileData.length - 1].trim())) { + profileToImport.setProfileToDelete(true); + } + } + exchange.getIn().setBody(profileToImport, ProfileToImport.class); + exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); + exchange.getIn().setHeader(KafkaConstants.KEY, "1"); + } + + /** + * Setter of fieldsMapping + * @param fieldsMapping map String,Integer fieldName in ES and the matching column index in the import file + */ + public void setFieldsMapping(Map<String, Integer> fieldsMapping) { + this.fieldsMapping = fieldsMapping; + } + + public void setPropertiesToOverwrite(List<String> propertiesToOverwrite) { + this.propertiesToOverwrite = propertiesToOverwrite; + } + + public void setOverwriteExistingProfiles(boolean overwriteExistingProfiles) { + this.overwriteExistingProfiles = overwriteExistingProfiles; + } + + public String getMergingProperty() { + return this.mergingProperty; + } + + /** + * Sets the merging property. + * @param mergingProperty property used to check if the profile exist when merging + */ + public void setMergingProperty(String mergingProperty) { + this.mergingProperty = mergingProperty; + } + + /** + * Sets the line separator. + * @param columnSeparator property used to specify a line separator. Defaults to ',' + */ + public void setColumnSeparator(String columnSeparator) { + this.columnSeparator = columnSeparator; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java new file mode 100644 index 0000000..7e55185 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/UnomiStorageProcessor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.unomi.router.api.ProfileToImport; +import org.apache.unomi.router.api.services.ProfileImportService; + +/** + * Created by amidani on 29/12/2016. + */ +public class UnomiStorageProcessor implements Processor { + + private ProfileImportService profileImportService; + + @Override + public void process(Exchange exchange) + throws Exception { + if (exchange.getIn() != null) { + Message message = exchange.getIn(); + + ProfileToImport profileToImport = (ProfileToImport) message.getBody(); + profileImportService.saveMergeDeleteImportedProfile(profileToImport); + } + } + + public void setProfileImportService(ProfileImportService profileImportService) { + this.profileImportService = profileImportService; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java new file mode 100644 index 0000000..40575d5 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.rest.RestBindingMode; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.core.context.ProfileImportCamelContext; +import org.apache.unomi.router.core.processor.ConfigUpdateProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by amidani on 10/05/2017. + */ +public class ProfileImportConfigUpdateRouteBuilder extends RouteBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ProfileImportConfigUpdateRouteBuilder.class.getName()); + + private ProfileImportCamelContext profileImportCamelContext; + + @Override + public void configure() throws Exception { + logger.info("Preparing REST Configuration for servlet with context path [/importConfigAdmin]"); + restConfiguration().component("servlet") + .contextPath("/importConfigAdmin") + .enableCORS(false) + .bindingMode(RestBindingMode.json) + .dataFormatProperty("prettyPrint", "true"); + + rest().put("/").consumes("application/json").type(ImportConfiguration.class) + .to("direct:importConfigRestDeposit"); + ConfigUpdateProcessor profileImportConfigUpdateProcessor = new ConfigUpdateProcessor(); + profileImportConfigUpdateProcessor.setProfileImportCamelContext(profileImportCamelContext); + from("direct:importConfigRestDeposit") + .process(profileImportConfigUpdateProcessor) + .transform().constant("Success.") + .onException(Exception.class) + .transform().constant("Failure!"); + + + } + + public void setProfileImportCamelContext(ProfileImportCamelContext profileImportCamelContext) { + this.profileImportCamelContext = profileImportCamelContext; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java new file mode 100644 index 0000000..1b056fe --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.core.processor.UnomiStorageProcessor; + +import java.util.Map; + +/** + * Created by amidani on 26/04/2017. + */ +public class ProfileImportKafkaToUnomiRouteBuilder extends RouteBuilder { + + private UnomiStorageProcessor unomiStorageProcessor; + private JacksonDataFormat jacksonDataFormat; + private String kafkaHost; + private String kafkaPort; + private String kafkaImportTopic; + private String kafkaImportGroupId; + + public ProfileImportKafkaToUnomiRouteBuilder(Map<String, String> kafkaProps) { + kafkaHost = kafkaProps.get("kafkaHost"); + kafkaPort = kafkaProps.get("kafkaPort"); + kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); + kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); + } + + @Override + public void configure() throws Exception { + + StringBuilder kafkaUri = new StringBuilder("kafka:"); + kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); + if(StringUtils.isNotBlank(kafkaImportGroupId)) { + kafkaUri.append("&groupId="+kafkaImportGroupId); + } + kafkaUri.append("&autoCommitEnable=true&consumersCount=10"); + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort); + kafkaConfiguration.setTopic(kafkaImportTopic); + kafkaConfiguration.setGroupId(kafkaImportGroupId); + KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); + endpoint.setConfiguration(kafkaConfiguration); + from(endpoint) + .unmarshal(jacksonDataFormat) + .process(unomiStorageProcessor) + .to("log:org.apache.unomi.router?level=INFO"); + + } + + public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { + this.unomiStorageProcessor = unomiStorageProcessor; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java new file mode 100644 index 0000000..d095f3e --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; +import org.apache.unomi.router.core.processor.LineSplitProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Created by amidani on 22/05/2017. + */ +public class ProfileImportOneShotRouteBuilder extends RouteBuilder { + + private Logger logger = LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName()); + + private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; + private JacksonDataFormat jacksonDataFormat; + private String uploadDir; + private String kafkaHost; + private String kafkaPort; + private String kafkaImportTopic; + private String kafkaImportGroupId; + + private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE"; + + public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps) { + kafkaHost = kafkaProps.get("kafkaHost"); + kafkaPort = kafkaProps.get("kafkaPort"); + kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); + kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); + } + + @Override + public void configure() throws Exception { + + //Prepare Kafka Deposit + StringBuilder kafkaUri = new StringBuilder("kafka:"); + kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); + if(StringUtils.isNotBlank(kafkaImportGroupId)) { + kafkaUri.append("&groupId="+ kafkaImportGroupId); + } + + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort); + kafkaConfiguration.setTopic(kafkaImportTopic); + kafkaConfiguration.setGroupId(kafkaImportGroupId); + KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); + endpoint.setConfiguration(kafkaConfiguration); + + LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); + + + from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m") + .routeId(IMPORT_ONESHOT_ROUTE_ID) + .autoStartup(true) + .process(importConfigByFileNameProcessor) + .split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}")) + .process(lineSplitProcessor) + .to("log:org.apache.unomi.router?level=INFO") + .marshal(jacksonDataFormat) + .convertBodyTo(String.class) + .to(endpoint); + } + + public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { + this.importConfigByFileNameProcessor = importConfigByFileNameProcessor; + } + + public void setUploadDir(String uploadDir) { + this.uploadDir = uploadDir; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java new file mode 100644 index 0000000..37ae59e --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.core.processor.LineSplitProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * Created by amidani on 26/04/2017. + */ + +public class ProfileImportSourceToKafkaRouteBuilder extends RouteBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ProfileImportSourceToKafkaRouteBuilder.class.getName()); + + private List<ImportConfiguration> importConfigurationList; + private JacksonDataFormat jacksonDataFormat; + private String kafkaHost; + private String kafkaPort; + private String kafkaImportTopic; + private String kafkaImportGroupId; + + public ProfileImportSourceToKafkaRouteBuilder(Map<String, String> kafkaProps) { + kafkaHost = kafkaProps.get("kafkaHost"); + kafkaPort = kafkaProps.get("kafkaPort"); + kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); + kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); + } + + @Override + public void configure() throws Exception { + //Prepare Kafka Deposit + StringBuilder kafkaUri = new StringBuilder("kafka:"); + kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); + if(StringUtils.isNotBlank(kafkaImportGroupId)) { + kafkaUri.append("&groupId="+ kafkaImportGroupId); + } + + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort); + kafkaConfiguration.setTopic(kafkaImportTopic); + kafkaConfiguration.setGroupId(kafkaImportGroupId); + KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); + endpoint.setConfiguration(kafkaConfiguration); + + //Loop on multiple import configuration + for(ImportConfiguration importConfiguration : importConfigurationList) { + if(importConfiguration.getProperties().size() > 0 && + StringUtils.isNotEmpty((String) importConfiguration.getProperties().get("source"))) { + //Prepare Split Processor + LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); + lineSplitProcessor.setFieldsMapping((Map<String, Integer>) importConfiguration.getProperties().get("mapping")); + lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles()); + lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite()); + lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty()); + lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator()); + + from((String) importConfiguration.getProperties().get("source")) + .routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop + .autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active + .split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator())) + .process(lineSplitProcessor) + .to("log:org.apache.unomi.router?level=INFO") + .marshal(jacksonDataFormat) + .convertBodyTo(String.class) + .to(endpoint); + } + } + } + + public void setImportConfigurationList(List<ImportConfiguration> importConfigurationList) { + this.importConfigurationList = importConfigurationList; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } + + public void setKafkaHost(String kafkaHost) { + this.kafkaHost = kafkaHost; + } + + public void setKafkaPort(String kafkaPort) { + this.kafkaPort = kafkaPort; + } + + public void setKafkaImportTopic(String kafkaImportTopic) { + this.kafkaImportTopic = kafkaImportTopic; + } + + public void setKafkaImportGroupId(String kafkaImportGroupId) { + this.kafkaImportGroupId = kafkaImportGroupId; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100644 index 0000000..4c36b9e --- /dev/null +++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<blueprint xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0" + xmlns:camel="http://camel.apache.org/schema/blueprint" + xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd + http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0 http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd + http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd"> + + <cm:property-placeholder persistent-id="org.apache.unomi.router" update-strategy="reload"> + <cm:default-properties> + <cm:property name="kafka.host" value="localhost"/> + <cm:property name="kafka.port" value="9092"/> + <cm:property name="kafka.import.topic" value="camel-deposit"/> + <cm:property name="kafka.import.groupId" value="unomi-import-group"/> + <cm:property name="import.oneshot.uploadDir" value="/tmp/oneshot_import_configs/"/> + </cm:default-properties> + </cm:property-placeholder> + + <bean id="unomiStorageProcessor" class="org.apache.unomi.router.core.processor.UnomiStorageProcessor"> + <property name="profileImportService" ref="profileImportService"/> + </bean> + + <bean id="importConfigByFileNameProcessor" class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor"> + <property name="importConfigurationService" ref="importConfigurationService"/> + </bean> + + <bean id="jacksonDataFormat" class="org.apache.camel.component.jackson.JacksonDataFormat"> + <property name="unmarshalType" value="org.apache.unomi.router.api.ProfileToImport"/> + </bean> + + <bean id="jacksonDataFormatImportConfig" class="org.apache.camel.model.dataformat.JsonDataFormat"> + <property name="unmarshalType" value="org.apache.unomi.router.api.ImportConfiguration"/> + <property name="library" value="Jackson"/> + </bean> + + <bean class="org.apache.camel.component.servlet.osgi.OsgiServletRegisterer" + init-method="register" + destroy-method="unregister"> + <property name="alias" value="/importConfigAdmin"/> + <property name="httpService" ref="httpService"/> + <property name="servlet" ref="camelServlet"/> + </bean> + + <bean id="camelServlet" class="org.apache.camel.component.servlet.CamelHttpTransportServlet"/> + + + <bean id="camelContext" class="org.apache.unomi.router.core.context.ProfileImportCamelContext" + init-method="initCamelContext" destroy-method="preDestroy"> + <property name="kafkaProps"> + <map> + <entry key="kafkaHost" value="${kafka.host}"/> + <entry key="kafkaPort" value="${kafka.port}"/> + <entry key="kafkaImportTopic" value="${kafka.import.topic}"/> + <entry key="kafkaImportGroupId" value="${kafka.import.groupId}"/> + </map> + </property> + <property name="uploadDir" value="${import.oneshot.uploadDir}"/> + <property name="unomiStorageProcessor" ref="unomiStorageProcessor"/> + <property name="importConfigByFileNameProcessor" ref="importConfigByFileNameProcessor"/> + <property name="importConfigurationService" ref="importConfigurationService"/> + <property name="jacksonDataFormat" ref="jacksonDataFormat"/> + <property name="bundleContext" ref="blueprintBundleContext"/> + </bean> + + <camel:camelContext id="httpEndpoint" xmlns="http://camel.apache.org/schema/blueprint"> + <camel:routeBuilder ref="profileImportConfigUpdateRouteBuilder" /> + </camel:camelContext> + + <bean id="profileImportConfigUpdateRouteBuilder" class="org.apache.unomi.router.core.route.ProfileImportConfigUpdateRouteBuilder"> + <property name="profileImportCamelContext" ref="camelContext"/> + </bean> + + <reference id="httpService" interface="org.osgi.service.http.HttpService"/> + <reference id="profileImportService" interface="org.apache.unomi.router.api.services.ProfileImportService"/> + <reference id="importConfigurationService" interface="org.apache.unomi.router.api.services.ImportConfigurationService"/> + +</blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/89d4c8eb/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg new file mode 100644 index 0000000..ff2c8ef --- /dev/null +++ b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +#Kafka + settingskafka.host=localhost +kafka.port=9092 +kafka.import.topic=camel-deposit +kafka.import.groupId=unomi-import-group + +#Import One Shot upload directory +import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/ \ No newline at end of file