UNOMI-102 : Refactor import code to avoid code duplication with export features
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/c62f91d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/c62f91d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/c62f91d4 Branch: refs/heads/master Commit: c62f91d4bec9ef723f9af4e50c7506cb88ec02d9 Parents: fe8fbef Author: Abdelkader Midani <amid...@apache.org> Authored: Wed Jun 28 02:28:17 2017 +0200 Committer: Abdelkader Midani <amid...@apache.org> Committed: Wed Jun 28 02:28:17 2017 +0200 ---------------------------------------------------------------------- .../api/services/ConfigSharingService.java | 32 +++ .../unomi/router/api/ExportConfiguration.java | 45 ++++ .../unomi/router/api/ImportConfiguration.java | 182 +------------- .../router/api/ImportExportConfiguration.java | 203 ++++++++++++++++ .../unomi/router/api/RouterConstants.java | 44 ++++ .../services/ImportConfigurationService.java | 60 ----- .../ImportExportConfigurationService.java | 61 +++++ extensions/router/router-core/pom.xml | 7 + .../unomi/router/core/RouterConstants.java | 41 ---- .../core/config/ConfigSharingServiceImpl.java | 77 ++++++ .../core/context/ProfileImportCamelContext.java | 187 -------------- .../router/core/context/RouterCamelContext.java | 243 +++++++++++++++++++ .../core/processor/ConfigUpdateProcessor.java | 12 +- .../ImportConfigByFileNameProcessor.java | 8 +- .../core/processor/LineSplitFailureHandler.java | 2 +- .../core/processor/LineSplitProcessor.java | 2 +- .../processor/RouteCompletionProcessor.java | 8 +- .../core/route/ConfigUpdateRouteBuilder.java | 73 ++++++ .../route/ProfileExportCollectRouteBuilder.java | 81 +++++++ .../ProfileImportAbstractRouteBuilder.java | 2 +- .../ProfileImportConfigUpdateRouteBuilder.java | 62 ----- .../ProfileImportFromSourceRouteBuilder.java | 104 ++++---- .../route/ProfileImportOneShotRouteBuilder.java | 2 +- .../route/ProfileImportToUnomiRouteBuilder.java | 2 +- .../resources/OSGI-INF/blueprint/blueprint.xml | 39 ++- .../main/resources/org.apache.unomi.router.cfg | 8 +- extensions/router/router-karaf-feature/pom.xml | 2 +- .../AbstractConfigurationServiceEndpoint.java | 92 +++++++ .../ExportConfigurationServiceEndPoint.java | 86 +++++++ .../ImportConfigurationServiceEndPoint.java | 92 ++----- .../resources/OSGI-INF/blueprint/blueprint.xml | 46 ++-- .../AbstractConfigurationServiceImpl.java | 73 ++++++ .../ExportConfigurationServiceImpl.java | 61 +++++ .../ImportConfigurationServiceImpl.java | 56 +---- .../resources/OSGI-INF/blueprint/blueprint.xml | 17 +- 35 files changed, 1363 insertions(+), 749 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/api/src/main/java/org/apache/unomi/api/services/ConfigSharingService.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/unomi/api/services/ConfigSharingService.java b/api/src/main/java/org/apache/unomi/api/services/ConfigSharingService.java new file mode 100644 index 0000000..1bfb050 --- /dev/null +++ b/api/src/main/java/org/apache/unomi/api/services/ConfigSharingService.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.api.services; + +/** + * A service to share cfg properties with other bundles. + */ +public interface ConfigSharingService { + + String getOneshotImportUploadDir(); + + void setOneshotImportUploadDir(String oneshotImportUploadDir); + + String getInternalServerPort(); + + void setInternalServerPort(String internalServerPort); + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ExportConfiguration.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ExportConfiguration.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ExportConfiguration.java new file mode 100644 index 0000000..5380691 --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ExportConfiguration.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.api; + +import org.apache.unomi.api.Item; +import org.apache.unomi.api.conditions.Condition; + +/** + * Created by amidani on 19/06/2017. + */ +public class ExportConfiguration extends ImportExportConfiguration { + + /** + * The ExportConfiguration ITEM_TYPE + * + * @see Item for a discussion of ITEM_TYPE + */ + public static final String ITEM_TYPE = "exportConfig"; + + private String[] segments; + + public String[] getSegments() { + return segments; + } + + public void setSegments(String[] segments) { + this.segments = segments; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java index ee0184a..b961cfb 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportConfiguration.java @@ -25,7 +25,7 @@ import java.util.*; /** * Created by amidani on 28/04/2017. */ -public class ImportConfiguration extends Item { +public class ImportConfiguration extends ImportExportConfiguration { /** * The ImportConfiguration ITEM_TYPE @@ -33,102 +33,10 @@ public class ImportConfiguration extends Item { * @see Item for a discussion of ITEM_TYPE */ public static final String ITEM_TYPE = "importConfig"; - private String name; - private String description; - private String configType; - private Map<String, Object> properties = new HashMap<>(); private String mergingProperty; private boolean overwriteExistingProfiles = false; private List<String> propertiesToOverwrite; - private String columnSeparator = ","; - private String lineSeparator = "\n"; - private boolean active = false; - private String status; - - private List<Map<String, Object>> executions = new ArrayList(); - - /** - * Sets the property identified by the specified name to the specified value. If a property with that name already exists, replaces its value, otherwise adds the new - * property with the specified name and value. - * - * @param name the name of the property to set - * @param value the value of the property - */ - public void setProperty(String name, Object value) { - properties.put(name, value); - } - - /** - * Retrieves the name of the import configuration - * @return the name of the import configuration - */ - public String getName() { return this.name; } - - /** - * Sets the name of the import configuration - * @param name the name of the import configuration - */ - public void setName(String name) { - this.name = name; - } - - /** - * Retrieves the description of the import configuration - * @return the description of the import configuration - */ - public String getDescription() { return this.description; } - - /** - * Sets the description of the import configuration - * @param description the description of the import configuration - */ - public void setDescription(String description) { - this.description = description; - } - - - /** - * Retrieves the config type of the import configuration - * @return the config type of the import configuration - */ - public String getConfigType() { return this.configType; } - - /** - * Sets the config type of the import configuration - * @param configType the config type of the import configuration - */ - public void setConfigType(String configType) { - this.configType = configType; - } - - /** - * Retrieves the property identified by the specified name. - * - * @param name the name of the property to retrieve - * @return the value of the specified property or {@code null} if no such property exists - */ - public Object getProperty(String name) { - return properties.get(name); - } - - /** - * Retrieves a Map of all property name - value pairs for this import configuration. - * - * @return a Map of all property name - value pairs for this import configuration - */ - public Map<String, Object> getProperties() { - return properties; - } - - /** - * Sets the property name - value pairs for this import configuration. - * - * @param properties a Map containing the property name - value pairs for this import configuration - */ - public void setProperties(Map<String, Object> properties) { - this.properties = properties; - } public String getMergingProperty() { return mergingProperty; @@ -142,43 +50,6 @@ public class ImportConfiguration extends Item { this.mergingProperty = mergingProperty; } - - /** - * Retrieves the import configuration active flag. - * - * @return true if the import configuration is active false if not - */ - public boolean isActive() { - return this.active; - } - - /** - * Sets the active flag true/false. - * - * @param active a boolean to set to active or inactive the import configuration - */ - public void setActive(boolean active) { - this.active = active; - } - - /** - * Retrieves the import configuration status for last execution. - * - * @return status of the last execution - */ - public String getStatus() { - return this.status; - } - - /** - * Sets status of the last execution. - * - * @param status of the last execution - */ - public void setStatus(String status) { - this.status = status; - } - /** * Retrieves the import configuration overwriteExistingProfiles flag. * @@ -205,55 +76,4 @@ public class ImportConfiguration extends Item { this.propertiesToOverwrite = propertiesToOverwrite; } - /** - * Retrieves the column separator. - */ - public String getColumnSeparator() { - return this.columnSeparator; - } - - /** - * Sets the column separator. - * @param columnSeparator property used to specify a line separator. Defaults to ',' - */ - public void setColumnSeparator(String columnSeparator) { - if(this.columnSeparator !=null) { - this.columnSeparator = columnSeparator; - } - } - - /** - * Retrieves the line separator. - */ - public String getLineSeparator() { - return this.lineSeparator; - } - - /** - * Sets the line separator. - * @param lineSeparator property used to specify a line separator. Defaults to '\n' - */ - public void setLineSeparator(String lineSeparator) { - if(lineSeparator != null) { - this.lineSeparator = lineSeparator; - } - } - - /** - * Retrieves the executions - */ - public List<Map<String, Object>> getExecutions() { - return this.executions; - } - - - /** - * Sets the executions - * @param executions - */ - public void setExecutions(List<Map<String, Object>> executions) { - this.executions = executions; - } - - } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java new file mode 100644 index 0000000..82e2f0d --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/ImportExportConfiguration.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.api; + + import org.apache.unomi.api.Item; + + import java.util.ArrayList; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + +/** + * Created by amidani on 21/06/2017. + */ +public class ImportExportConfiguration extends Item { + + private String name; + private String description; + private String configType; + private Map<String, Object> properties = new HashMap<>(); + private String columnSeparator = ","; + private String lineSeparator = "\n"; + private boolean active; + private String status; + + private List<Map<String, Object>> executions = new ArrayList(); + + /** + * Sets the property identified by the specified name to the specified value. If a property with that name already exists, replaces its value, otherwise adds the new + * property with the specified name and value. + * + * @param name the name of the property to set + * @param value the value of the property + */ + public void setProperty(String name, Object value) { + properties.put(name, value); + } + + /** + * Retrieves the name of the import configuration + * @return the name of the import configuration + */ + public String getName() { return this.name; } + + /** + * Sets the name of the import configuration + * @param name the name of the import configuration + */ + public void setName(String name) { + this.name = name; + } + + /** + * Retrieves the description of the import configuration + * @return the description of the import configuration + */ + public String getDescription() { return this.description; } + + /** + * Sets the description of the import configuration + * @param description the description of the import configuration + */ + public void setDescription(String description) { + this.description = description; + } + + + /** + * Retrieves the config type of the import configuration + * @return the config type of the import configuration + */ + public String getConfigType() { return this.configType; } + + /** + * Sets the config type of the import configuration + * @param configType the config type of the import configuration + */ + public void setConfigType(String configType) { + this.configType = configType; + } + + /** + * Retrieves the property identified by the specified name. + * + * @param name the name of the property to retrieve + * @return the value of the specified property or {@code null} if no such property exists + */ + public Object getProperty(String name) { + return properties.get(name); + } + + /** + * Retrieves a Map of all property name - value pairs for this import configuration. + * + * @return a Map of all property name - value pairs for this import configuration + */ + public Map<String, Object> getProperties() { + return properties; + } + + /** + * Retrieves the import configuration active flag. + * + * @return true if the import configuration is active false if not + */ + public boolean isActive() { + return this.active; + } + + /** + * Sets the active flag true/false. + * + * @param active a boolean to set to active or inactive the import configuration + */ + public void setActive(boolean active) { + this.active = active; + } + + /** + * Retrieves the import configuration status for last execution. + * + * @return status of the last execution + */ + public String getStatus() { + return this.status; + } + + /** + * Sets status of the last execution. + * + * @param status of the last execution + */ + public void setStatus(String status) { + this.status = status; + } + + /** + * Retrieves the column separator. + */ + public String getColumnSeparator() { + return this.columnSeparator; + } + + /** + * Sets the column separator. + * @param columnSeparator property used to specify a line separator. Defaults to ',' + */ + public void setColumnSeparator(String columnSeparator) { + if(this.columnSeparator !=null) { + this.columnSeparator = columnSeparator; + } + } + + /** + * Retrieves the line separator. + */ + public String getLineSeparator() { + return this.lineSeparator; + } + + /** + * Sets the line separator. + * @param lineSeparator property used to specify a line separator. Defaults to '\n' + */ + public void setLineSeparator(String lineSeparator) { + if(lineSeparator != null) { + this.lineSeparator = lineSeparator; + } + } + + /** + * Retrieves the executions + */ + public List<Map<String, Object>> getExecutions() { + return this.executions; + } + + + /** + * Sets the executions + * @param executions + */ + public void setExecutions(List<Map<String, Object>> executions) { + this.executions = executions; + } + + +} + http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java new file mode 100644 index 0000000..45de3d6 --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.api; + +/** + * Created by amidani on 13/06/2017. + */ +public interface RouterConstants { + + String CONFIG_TYPE_NOBROKER = "nobroker"; + String CONFIG_TYPE_KAFKA = "kafka"; + + String CONFIG_STATUS_RUNNING = "RUNNING"; + String CONFIG_STATUS_COMPLETE_ERRORS = "ERRORS"; + String CONFIG_STATUS_COMPLETE_SUCCESS = "SUCCESS"; + String CONFIG_STATUS_COMPLETE_WITH_ERRORS = "WITH_ERRORS"; + + String IMPORT_EXPORT_CONFIG_TYPE_RECURRENT = "recurrent"; + String IMPORT_EXPORT_CONFIG_TYPE_ONESHOT = "oneshot"; + + String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer"; + + String DIRECTION_FROM = "from"; + String DIRECTION_TO = "to"; + + String HEADER_CONFIG_TYPE = "configType"; + + String HEADER_FAILED_MESSAGE = "failedMessage"; + String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot"; +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java deleted file mode 100644 index 92991fe..0000000 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportConfigurationService.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.api.services; - -import org.apache.unomi.router.api.ImportConfiguration; - -import java.util.List; - -/** - * A service to access and operate on {@link ImportConfiguration}s. - */ -public interface ImportConfigurationService { - - /** - * Retrieves all the import configurations. - * - * @return the list of import configurations - */ - public List<ImportConfiguration> getImportConfigurations(); - - /** - * Retrieves the import configuration identified by the specified identifier. - * - * @param configId the identifier of the profile to retrieve - * @return the import configuration identified by the specified identifier or - * {@code null} if no such import configuration exists - */ - public ImportConfiguration load(String configId); - - /** - * Saves the specified import configuration in the context server. - * - * @param importConfiguration the import configuration to be saved - * @return the newly saved import configuration - */ - public ImportConfiguration save(ImportConfiguration importConfiguration); - - /** - * Deletes the import configuration identified by the specified identifier. - * - * @param configId the identifier of the import configuration to delete - */ - public void delete(String configId); - - -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java new file mode 100644 index 0000000..7868b9c --- /dev/null +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.api.services; + +import org.apache.unomi.router.api.ExportConfiguration; +import org.apache.unomi.router.api.ImportConfiguration; + +import java.util.List; + +/** + * A service to access and operate on {@link ImportConfiguration}s / {@link ExportConfiguration}s. + */ +public interface ImportExportConfigurationService<T> { + + /** + * Retrieves all the import/export configurations. + * + * @return the list of import/export configurations + */ + public List<T> getAll(); + + /** + * Retrieves the import/export configuration identified by the specified identifier. + * + * @param configId the identifier of the profile to retrieve + * @return the import/export configuration identified by the specified identifier or + * {@code null} if no such import/export configuration exists + */ + public T load(String configId); + + /** + * Saves the specified import/export configuration in the context server. + * + * @param configuration the import/export configuration to be saved + * @return the newly saved import/export configuration + */ + public T save(T configuration); + + /** + * Deletes the import/export configuration identified by the specified identifier. + * + * @param configId the identifier of the import/export configuration to delete + */ + public void delete(String configId); + + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/pom.xml b/extensions/router/router-core/pom.xml index 95cc6b2..7e4a60a 100644 --- a/extensions/router/router-core/pom.xml +++ b/extensions/router/router-core/pom.xml @@ -47,6 +47,12 @@ </dependency> <dependency> <groupId>org.apache.unomi</groupId> + <artifactId>unomi-persistence-spi</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.unomi</groupId> <artifactId>unomi-services</artifactId> <version>${project.version}</version> <scope>provided</scope> @@ -138,6 +144,7 @@ org.apache.unomi.api, org.apache.unomi.router.api, org.apache.unomi.api.services, + org.apache.unomi.persistence.spi, org.apache.unomi.router.api.services, org.apache.kafka.clients.producer;resolution:=optional, org.apache.kafka.clients.consumer;resolution:=optional, http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java deleted file mode 100644 index 04e3709..0000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core; - -/** - * Created by amidani on 13/06/2017. - */ -public interface RouterConstants { - - String CONFIG_TYPE_NOBROKER = "nobroker"; - String CONFIG_TYPE_KAFKA = "kafka"; - - String CONFIG_STATUS_RUNNING = "RUNNING"; - String CONFIG_STATUS_COMPLETE_ERRORS = "ERRORS"; - String CONFIG_STATUS_COMPLETE_SUCCESS = "SUCCESS"; - String CONFIG_STATUS_COMPLETE_WITH_ERRORS = "WITH_ERRORS"; - - String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer"; - - String DIRECTION_FROM = "from"; - String DIRECTION_TO = "to"; - - String HEADER_CONFIG_TYPE = "configType"; - - String HEADER_FAILED_MESSAGE = "failedMessage"; - String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot"; -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/config/ConfigSharingServiceImpl.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/config/ConfigSharingServiceImpl.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/config/ConfigSharingServiceImpl.java new file mode 100644 index 0000000..6aa357e --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/config/ConfigSharingServiceImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.config; + +import org.apache.unomi.api.services.ConfigSharingService; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleEvent; +import org.osgi.framework.SynchronousBundleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by amidani on 27/06/2017. + */ +public class ConfigSharingServiceImpl implements ConfigSharingService, SynchronousBundleListener { + + private static final Logger logger = LoggerFactory.getLogger(ConfigSharingServiceImpl.class); + + private String oneshotImportUploadDir; + private BundleContext bundleContext; + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + @Override + public String getOneshotImportUploadDir() { + return oneshotImportUploadDir; + } + + @Override + public void setOneshotImportUploadDir(String oneshotImportUploadDir) { + this.oneshotImportUploadDir = oneshotImportUploadDir; + } + + /** Methods below not used in router bundle implementation of the ConfigSharingService **/ + + @Override + public String getInternalServerPort() { + return null; + } + + @Override + public void setInternalServerPort(String internalServerPort) { } + + + public void preDestroy() throws Exception { + bundleContext.removeBundleListener(this); + logger.info("Config sharing service for Router is shutdown."); + } + + private void processBundleStartup(BundleContext bundleContext) { + if (bundleContext == null) { + return; + } + } + + @Override + public void bundleChanged(BundleEvent bundleEvent) { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java deleted file mode 100644 index 7942f02..0000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.context; - -import org.apache.camel.CamelContext; -import org.apache.camel.Route; -import org.apache.camel.component.jackson.JacksonDataFormat; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.model.RouteDefinition; -import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.api.services.ImportConfigurationService; -import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; -import org.apache.unomi.router.core.processor.RouteCompletionProcessor; -import org.apache.unomi.router.core.processor.UnomiStorageProcessor; -import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder; -import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder; -import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder; -import org.osgi.framework.Bundle; -import org.osgi.framework.BundleContext; -import org.osgi.framework.BundleEvent; -import org.osgi.framework.SynchronousBundleListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * Created by amidani on 04/05/2017. - */ -public class ProfileImportCamelContext implements SynchronousBundleListener { - - private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent"; - private Logger logger = LoggerFactory.getLogger(ProfileImportCamelContext.class.getName()); - private CamelContext camelContext; - private UnomiStorageProcessor unomiStorageProcessor; - private RouteCompletionProcessor routeCompletionProcessor; - private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; - private ImportConfigurationService importConfigurationService; - private JacksonDataFormat jacksonDataFormat; - private String uploadDir; - private Map<String, String> kafkaProps; - private String configType; - private BundleContext bundleContext; - - public void setBundleContext(BundleContext bundleContext) { - this.bundleContext = bundleContext; - } - - public void initCamelContext() throws Exception { - logger.info("Initialize Camel Context..."); - camelContext = new DefaultCamelContext(); - - ProfileImportFromSourceRouteBuilder builderReader = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); - builderReader.setImportConfigurationService(importConfigurationService); - builderReader.setJacksonDataFormat(jacksonDataFormat); - builderReader.setContext(camelContext); - camelContext.addRoutes(builderReader); - - //One shot import route - ProfileImportOneShotRouteBuilder builderOneShot = new ProfileImportOneShotRouteBuilder(kafkaProps, configType); - builderOneShot.setImportConfigByFileNameProcessor(importConfigByFileNameProcessor); - builderOneShot.setJacksonDataFormat(jacksonDataFormat); - builderOneShot.setUploadDir(uploadDir); - builderOneShot.setContext(camelContext); - camelContext.addRoutes(builderOneShot); - - - ProfileImportToUnomiRouteBuilder builderProcessor = new ProfileImportToUnomiRouteBuilder(kafkaProps, configType); - builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor); - builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor); - builderProcessor.setJacksonDataFormat(jacksonDataFormat); - builderProcessor.setContext(camelContext); - camelContext.addRoutes(builderProcessor); - - camelContext.start(); - - logger.debug("postConstruct {" + bundleContext.getBundle() + "}"); - - processBundleStartup(bundleContext); - for (Bundle bundle : bundleContext.getBundles()) { - if (bundle.getBundleContext() != null) { - processBundleStartup(bundle.getBundleContext()); - } - } - bundleContext.addBundleListener(this); - logger.info("Camel Context {} initialized successfully."); - - } - - private boolean stopRoute(String routeId) throws Exception { - return camelContext.stopRoute(routeId, 10L, TimeUnit.SECONDS, true); - } - - public void updateProfileImportReaderRoute(ImportConfiguration importConfiguration) throws Exception { - //Active routes - Route route = camelContext.getRoute(importConfiguration.getItemId()); - if (route != null && stopRoute(importConfiguration.getItemId())) { - camelContext.removeRoute(importConfiguration.getItemId()); - } - - //Inactive routes - RouteDefinition routeDefinition = camelContext.getRouteDefinition(importConfiguration.getItemId()); - if (routeDefinition != null) { - camelContext.removeRouteDefinition(routeDefinition); - } - //Handle transforming an import config oneshot <--> recurrent - if (IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())) { - ProfileImportFromSourceRouteBuilder builder = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); - builder.setImportConfigurationList(Arrays.asList(importConfiguration)); - builder.setImportConfigurationService(importConfigurationService); - builder.setJacksonDataFormat(jacksonDataFormat); - builder.setContext(camelContext); - camelContext.addRoutes(builder); - } - } - - public CamelContext getCamelContext() { - return camelContext; - } - - public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { - this.unomiStorageProcessor = unomiStorageProcessor; - } - - public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) { - this.routeCompletionProcessor = routeCompletionProcessor; - } - - public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { - this.importConfigByFileNameProcessor = importConfigByFileNameProcessor; - } - - public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { - this.importConfigurationService = importConfigurationService; - } - - public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { - this.jacksonDataFormat = jacksonDataFormat; - } - - public void setUploadDir(String uploadDir) { - this.uploadDir = uploadDir; - } - - public void setKafkaProps(Map<String, String> kafkaProps) { - this.kafkaProps = kafkaProps; - } - - public void setConfigType(String configType) { - this.configType = configType; - } - - public void preDestroy() throws Exception { - bundleContext.removeBundleListener(this); - //This is to shutdown Camel context - //(will stop all routes/components/endpoints etc and clear internal state/cache) - this.camelContext.stop(); - logger.info("Camel context for profile import is shutdown."); - } - - private void processBundleStartup(BundleContext bundleContext) { - if (bundleContext == null) { - return; - } - } - - @Override - public void bundleChanged(BundleEvent bundleEvent) { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java new file mode 100644 index 0000000..5a26a15 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.context; + +import org.apache.camel.CamelContext; +import org.apache.camel.Route; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.unomi.persistence.spi.PersistenceService; +import org.apache.unomi.router.api.ExportConfiguration; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.api.RouterConstants; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; +import org.apache.unomi.router.core.processor.RouteCompletionProcessor; +import org.apache.unomi.router.core.processor.UnomiStorageProcessor; +import org.apache.unomi.router.core.route.ProfileExportCollectRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.BundleEvent; +import org.osgi.framework.SynchronousBundleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Created by amidani on 04/05/2017. + */ +public class RouterCamelContext implements SynchronousBundleListener { + + private Logger logger = LoggerFactory.getLogger(RouterCamelContext.class.getName()); + private CamelContext camelContext; + private UnomiStorageProcessor unomiStorageProcessor; + private RouteCompletionProcessor routeCompletionProcessor; + private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; + private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; + private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService; + private PersistenceService persistenceService; + private JacksonDataFormat jacksonDataFormat; + private String uploadDir; + private Map<String, String> kafkaProps; + private String configType; + private String allowedEndpoints; + private BundleContext bundleContext; + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } + + public void initCamelContext() throws Exception { + logger.info("Initialize Camel Context..."); + camelContext = new DefaultCamelContext(); + + //--IMPORT ROUTES + + //Source + ProfileImportFromSourceRouteBuilder builderReader = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); + builderReader.setImportConfigurationService(importConfigurationService); + builderReader.setJacksonDataFormat(jacksonDataFormat); + builderReader.setAllowedEndpoints(allowedEndpoints); + builderReader.setContext(camelContext); + camelContext.addRoutes(builderReader); + + //One shot import route + ProfileImportOneShotRouteBuilder builderOneShot = new ProfileImportOneShotRouteBuilder(kafkaProps, configType); + builderOneShot.setImportConfigByFileNameProcessor(importConfigByFileNameProcessor); + builderOneShot.setJacksonDataFormat(jacksonDataFormat); + builderOneShot.setUploadDir(uploadDir); + builderOneShot.setContext(camelContext); + camelContext.addRoutes(builderOneShot); + + //Unomi sink route + ProfileImportToUnomiRouteBuilder builderProcessor = new ProfileImportToUnomiRouteBuilder(kafkaProps, configType); + builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor); + builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor); + builderProcessor.setJacksonDataFormat(jacksonDataFormat); + builderProcessor.setContext(camelContext); + camelContext.addRoutes(builderProcessor); + + //--EXPORT ROUTES + ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(); + profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService); + profileExportCollectRouteBuilder.setPersistenceService(persistenceService); + profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints); + profileExportCollectRouteBuilder.setContext(camelContext); + camelContext.addRoutes(profileExportCollectRouteBuilder); + + + camelContext.start(); + + logger.debug("postConstruct {" + bundleContext.getBundle() + "}"); + + processBundleStartup(bundleContext); + for (Bundle bundle : bundleContext.getBundles()) { + if (bundle.getBundleContext() != null) { + processBundleStartup(bundle.getBundleContext()); + } + } + bundleContext.addBundleListener(this); + logger.info("Camel Context {} initialized successfully."); + + } + + private boolean stopRoute(String routeId) throws Exception { + return camelContext.stopRoute(routeId, 10L, TimeUnit.SECONDS, true); + } + + private void killExistingRoute(String routeId) throws Exception { + //Active routes + Route route = camelContext.getRoute(routeId); + if (route != null && stopRoute(routeId)) { + camelContext.removeRoute(routeId); + } + //Inactive routes + RouteDefinition routeDefinition = camelContext.getRouteDefinition(routeId); + if (routeDefinition != null) { + camelContext.removeRouteDefinition(routeDefinition); + } + } + + public void updateProfileReaderRoute(Object configuration) throws Exception { + if (configuration instanceof ImportConfiguration) { + updateProfileImportReaderRoute((ImportConfiguration) configuration); + } else { + updateProfileExportReaderRoute((ExportConfiguration) configuration); + } + } + + private void updateProfileImportReaderRoute(ImportConfiguration importConfiguration) throws Exception { + + //Handle transforming an import config oneshot <--> recurrent + if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())) { + ProfileImportFromSourceRouteBuilder builder = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); + builder.setImportConfigurationList(Arrays.asList(importConfiguration)); + builder.setImportConfigurationService(importConfigurationService); + builder.setAllowedEndpoints(allowedEndpoints); + builder.setJacksonDataFormat(jacksonDataFormat); + builder.setContext(camelContext); + camelContext.addRoutes(builder); + } + } + + private void updateProfileExportReaderRoute(ExportConfiguration exportConfiguration) throws Exception { + killExistingRoute(exportConfiguration.getItemId()); + //Handle transforming an import config oneshot <--> recurrent + if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) { + ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(); + profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService); + profileExportCollectRouteBuilder.setPersistenceService(persistenceService); + profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints); + profileExportCollectRouteBuilder.setContext(camelContext); + camelContext.addRoutes(profileExportCollectRouteBuilder); + } + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { + this.unomiStorageProcessor = unomiStorageProcessor; + } + + public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) { + this.routeCompletionProcessor = routeCompletionProcessor; + } + + public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { + this.importConfigByFileNameProcessor = importConfigByFileNameProcessor; + } + + public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { + this.importConfigurationService = importConfigurationService; + } + + public void setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration> exportConfigurationService) { + this.exportConfigurationService = exportConfigurationService; + } + + public void setPersistenceService(PersistenceService persistenceService) { + this.persistenceService = persistenceService; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } + + public void setUploadDir(String uploadDir) { + this.uploadDir = uploadDir; + } + + public void setKafkaProps(Map<String, String> kafkaProps) { + this.kafkaProps = kafkaProps; + } + + public void setConfigType(String configType) { + this.configType = configType; + } + + public void setAllowedEndpoints(String allowedEndpoints) { + this.allowedEndpoints = allowedEndpoints; + } + + public void preDestroy() throws Exception { + bundleContext.removeBundleListener(this); + //This is to shutdown Camel context + //(will stop all routes/components/endpoints etc and clear internal state/cache) + this.camelContext.stop(); + logger.info("Camel context for profile import is shutdown."); + } + + private void processBundleStartup(BundleContext bundleContext) { + if (bundleContext == null) { + return; + } + } + + @Override + public void bundleChanged(BundleEvent bundleEvent) { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java index d8e4b9e..8e6ab36 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java @@ -20,25 +20,25 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.core.context.ProfileImportCamelContext; +import org.apache.unomi.router.core.context.RouterCamelContext; /** * Created by amidani on 10/05/2017. */ public class ConfigUpdateProcessor implements Processor { - private ProfileImportCamelContext profileImportCamelContext; + private RouterCamelContext routerCamelContext; @Override public void process(Exchange exchange) throws Exception { if (exchange.getIn() != null) { Message message = exchange.getIn(); - ImportConfiguration importConfiguration = message.getBody(ImportConfiguration.class); - profileImportCamelContext.updateProfileImportReaderRoute(importConfiguration); + Object configuration = message.getBody(); + routerCamelContext.updateProfileReaderRoute(configuration); } } - public void setProfileImportCamelContext(ProfileImportCamelContext profileImportCamelContext) { - this.profileImportCamelContext = profileImportCamelContext; + public void setRouterCamelContext(RouterCamelContext routerCamelContext) { + this.routerCamelContext = routerCamelContext; } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java index a910a1d..b4f7bd1 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java @@ -20,15 +20,15 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.api.services.ImportConfigurationService; -import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.apache.unomi.router.api.RouterConstants; /** * Created by amidani on 22/05/2017. */ public class ImportConfigByFileNameProcessor implements Processor { - private ImportConfigurationService importConfigurationService; + private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; @Override public void process(Exchange exchange) throws Exception { @@ -39,7 +39,7 @@ public class ImportConfigByFileNameProcessor implements Processor { exchange.getIn().setHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT, importConfiguration); } - public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { + public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { this.importConfigurationService = importConfigurationService; } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java index bfb92fb..96afaac 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java @@ -19,7 +19,7 @@ package org.apache.unomi.router.core.processor; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.unomi.router.api.ImportLineError; -import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java index afff204..e047b63 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -22,7 +22,7 @@ import org.apache.camel.component.kafka.KafkaConstants; import org.apache.commons.lang3.StringUtils; import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.ProfileToImport; -import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java index 36ec319..b522426 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java @@ -21,8 +21,8 @@ import org.apache.camel.Processor; import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.ImportLineError; import org.apache.unomi.router.api.ProfileToImport; -import org.apache.unomi.router.api.services.ImportConfigurationService; -import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.apache.unomi.router.api.RouterConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,7 @@ import java.util.*; public class RouteCompletionProcessor implements Processor { private static final Logger logger = LoggerFactory.getLogger(RouteCompletionProcessor.class.getName()); - private ImportConfigurationService importConfigurationService; + private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; private int executionsHistorySize; @Override @@ -95,7 +95,7 @@ public class RouteCompletionProcessor implements Processor { logger.info("Processing route {} completed.", exchange.getFromRouteId()); } - public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { + public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { this.importConfigurationService = importConfigurationService; } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java new file mode 100644 index 0000000..dd70033 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.model.rest.RestBindingMode; +import org.apache.unomi.api.services.ConfigSharingService; +import org.apache.unomi.router.api.ExportConfiguration; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.core.context.RouterCamelContext; +import org.apache.unomi.router.core.processor.ConfigUpdateProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by amidani on 10/05/2017. + */ +public class ConfigUpdateRouteBuilder extends RouteBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ConfigUpdateRouteBuilder.class.getName()); + private RouterCamelContext routerCamelContext; + + @Override + public void configure() throws Exception { + logger.info("Preparing REST Configuration for servlet with context path [/configUpdate]"); + restConfiguration().component("servlet") + .contextPath("/configUpdate") + .enableCORS(false) + .bindingMode(RestBindingMode.json) + .dataFormatProperty("prettyPrint", "true"); + + rest().put("/importConfigAdmin").consumes("application/json").type(ImportConfiguration.class) + .to("direct:importConfigRestDeposit"); + + ConfigUpdateProcessor profileConfigUpdateProcessor = new ConfigUpdateProcessor(); + profileConfigUpdateProcessor.setRouterCamelContext(routerCamelContext); + from("direct:importConfigRestDeposit") + .process(profileConfigUpdateProcessor) + .transform().constant("Success.") + .onException(Exception.class) + .transform().constant("Failure!"); + + rest().put("/exportConfigAdmin").consumes("application/json").type(ExportConfiguration.class) + .to("direct:exportConfigRestDeposit"); + + from("direct:exportConfigRestDeposit") + .process(profileConfigUpdateProcessor) + .transform().constant("Success.") + .onException(Exception.class) + .transform().constant("Failure!"); + + + } + + public void setRouterCamelContext(RouterCamelContext routerCamelContext) { + this.routerCamelContext = routerCamelContext; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java new file mode 100644 index 0000000..5c3015e --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.api.Profile; +import org.apache.unomi.persistence.spi.PersistenceService; +import org.apache.unomi.router.api.ExportConfiguration; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Created by amidani on 27/06/2017. + */ +public class ProfileExportCollectRouteBuilder extends RouteBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class); + + private List<ExportConfiguration> exportConfigurationList; + private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService; + private PersistenceService persistenceService; + + private String allowedEndpoints; + + @Override + public void configure() throws Exception { + logger.info("Configure Recurrent Route 'Export :: Collect Data'"); + + if (exportConfigurationList == null) { + exportConfigurationList = exportConfigurationService.getAll(); + } + + //Loop on multiple export configuration + for (final ExportConfiguration exportConfiguration : exportConfigurationList) { + String endpoint = (String) exportConfiguration.getProperties().get("destination"); + + if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) { + List<Profile> profilesCollected = persistenceService.query("segments", (String) exportConfiguration.getProperties().get("segments"), + null, Profile.class); + logger.info("Collected +++{}+++ profiles.", profilesCollected.size()); + } else { + logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), exportConfiguration.getItemId()); + } + } + } + + public void setExportConfigurationList(List<ExportConfiguration> exportConfigurationList) { + this.exportConfigurationList = exportConfigurationList; + } + + public void setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration> exportConfigurationService) { + this.exportConfigurationService = exportConfigurationService; + } + + public void setPersistenceService(PersistenceService persistenceService) { + this.persistenceService = persistenceService; + } + + public void setAllowedEndpoints(String allowedEndpoints) { + this.allowedEndpoints = allowedEndpoints; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java index f4f7a43..bacc38e 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java @@ -22,7 +22,7 @@ import org.apache.camel.component.kafka.KafkaComponent; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.api.RouterConstants; import java.util.Map; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java deleted file mode 100644 index 40575d5..0000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportConfigUpdateRouteBuilder.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.route; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.model.rest.RestBindingMode; -import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.core.context.ProfileImportCamelContext; -import org.apache.unomi.router.core.processor.ConfigUpdateProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by amidani on 10/05/2017. - */ -public class ProfileImportConfigUpdateRouteBuilder extends RouteBuilder { - - private static final Logger logger = LoggerFactory.getLogger(ProfileImportConfigUpdateRouteBuilder.class.getName()); - - private ProfileImportCamelContext profileImportCamelContext; - - @Override - public void configure() throws Exception { - logger.info("Preparing REST Configuration for servlet with context path [/importConfigAdmin]"); - restConfiguration().component("servlet") - .contextPath("/importConfigAdmin") - .enableCORS(false) - .bindingMode(RestBindingMode.json) - .dataFormatProperty("prettyPrint", "true"); - - rest().put("/").consumes("application/json").type(ImportConfiguration.class) - .to("direct:importConfigRestDeposit"); - ConfigUpdateProcessor profileImportConfigUpdateProcessor = new ConfigUpdateProcessor(); - profileImportConfigUpdateProcessor.setProfileImportCamelContext(profileImportCamelContext); - from("direct:importConfigRestDeposit") - .process(profileImportConfigUpdateProcessor) - .transform().constant("Success.") - .onException(Exception.class) - .transform().constant("Failure!"); - - - } - - public void setProfileImportCamelContext(ProfileImportCamelContext profileImportCamelContext) { - this.profileImportCamelContext = profileImportCamelContext; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java index 0af561f..7f54884 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -23,8 +23,8 @@ import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.camel.model.ProcessorDefinition; import org.apache.commons.lang3.StringUtils; import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.api.services.ImportConfigurationService; -import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.apache.unomi.router.core.processor.LineSplitFailureHandler; import org.apache.unomi.router.core.processor.LineSplitProcessor; @@ -44,7 +44,9 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo private static final Logger logger = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName()); private List<ImportConfiguration> importConfigurationList; - private ImportConfigurationService importConfigurationService; + private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; + + private String allowedEndpoints; public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); @@ -56,13 +58,24 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo logger.info("Configure Recurrent Route 'From Source'"); if (importConfigurationList == null) { - importConfigurationList = importConfigurationService.getImportConfigurations(); + importConfigurationList = importConfigurationService.getAll(); + } + + ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class) + .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !") + .handled(true) + .process(new LineSplitFailureHandler()); + + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } else { + prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); } //Loop on multiple import configuration for (final ImportConfiguration importConfiguration : importConfigurationList) { - if (importConfiguration.getProperties().size() > 0 && - StringUtils.isNotEmpty((String) importConfiguration.getProperties().get("source"))) { + if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType()) && + importConfiguration.getProperties().size() > 0) { //Prepare Split Processor LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); lineSplitProcessor.setFieldsMapping((Map<String, Integer>) importConfiguration.getProperties().get("mapping")); @@ -71,46 +84,41 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty()); lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator()); - ProcessorDefinition prDefErr = onException(BadProfileDataFormatException.class) - .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !") - .handled(true) - .process(new LineSplitFailureHandler()); - - if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { - prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + String endpoint = (String) importConfiguration.getProperties().get("source"); + + if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) { + ProcessorDefinition prDef = from(endpoint) + .routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop + .autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active + .onCompletion() + // this route is only invoked when the original route is complete as a kind + // of completion callback + .log(LoggingLevel.DEBUG, "ROUTE [" + importConfiguration.getItemId() + "] is now complete [" + new Date().toString() + "]") + // must use end to denote the end of the onCompletion route + .end() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_RUNNING); + importConfigurationService.save(importConfiguration); + } + }) + .split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator())) + .log(LoggingLevel.DEBUG, "Splitted into ${exchangeProperty.CamelSplitSize} records") + .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType)) + .process(lineSplitProcessor) + .log(LoggingLevel.DEBUG, "Split IDX ${exchangeProperty.CamelSplitIndex} record") + .to("log:org.apache.unomi.router?level=DEBUG") + .marshal(jacksonDataFormat) + .convertBodyTo(String.class); + + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } else { + prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } } else { - prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); - } - - ProcessorDefinition prDef = from((String) importConfiguration.getProperties().get("source")) - .routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop - .autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active - .onCompletion() - // this route is only invoked when the original route is complete as a kind - // of completion callback - .log(LoggingLevel.DEBUG, "ROUTE [" + importConfiguration.getItemId() + "] is now complete [" + new Date().toString() + "]") - // must use end to denote the end of the onCompletion route - .end() - .process(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_RUNNING); - importConfigurationService.save(importConfiguration); - } - }) - .split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator())) - .log(LoggingLevel.DEBUG, "Splitted into ${exchangeProperty.CamelSplitSize} records") - .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType)) - .process(lineSplitProcessor) - .log(LoggingLevel.DEBUG, "Split IDX ${exchangeProperty.CamelSplitIndex} record") - .to("log:org.apache.unomi.router?level=DEBUG") - .marshal(jacksonDataFormat) - .convertBodyTo(String.class); - - if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { - prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); - } else { - prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), importConfiguration.getItemId()); } } } @@ -120,8 +128,12 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo this.importConfigurationList = importConfigurationList; } - public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { + public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { this.importConfigurationService = importConfigurationService; } + public void setAllowedEndpoints(String allowedEndpoints) { + this.allowedEndpoints = allowedEndpoints; + } + } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/c62f91d4/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java index c86e5e0..a94b5ed 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -20,7 +20,7 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.camel.model.ProcessorDefinition; -import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; import org.apache.unomi.router.core.processor.LineSplitFailureHandler;