Repository: incubator-unomi Updated Branches: refs/heads/master 9db8296e7 -> 5166fc23e
UNOMI-102 : Add Camel config for export features Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/b7194f47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/b7194f47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/b7194f47 Branch: refs/heads/master Commit: b7194f470d4ec728f0205b3386571f21d15f2bda Parents: df59f57 Author: Abdelkader Midani <amid...@apache.org> Authored: Thu Jun 29 03:09:55 2017 +0200 Committer: Abdelkader Midani <amid...@apache.org> Committed: Thu Jun 29 03:09:55 2017 +0200 ---------------------------------------------------------------------- .../unomi/router/api/RouterConstants.java | 9 +- .../router/core/bean/CollectProfileBean.java | 38 +++++++ .../router/core/context/RouterCamelContext.java | 33 ++++-- .../core/processor/ConfigUpdateProcessor.java | 1 - .../ExportRouteCompletionProcessor.java | 76 ++++++++++++++ .../ImportRouteCompletionProcessor.java | 105 +++++++++++++++++++ .../core/processor/LineBuildProcessor.java | 54 ++++++++++ .../processor/RouteCompletionProcessor.java | 105 ------------------- .../core/route/ConfigUpdateRouteBuilder.java | 1 - .../route/ProfileExportCollectRouteBuilder.java | 53 +++++++--- .../ProfileExportProducerRouteBuilder.java | 69 ++++++++++++ .../ProfileImportAbstractRouteBuilder.java | 83 --------------- .../ProfileImportFromSourceRouteBuilder.java | 18 ++-- .../route/ProfileImportOneShotRouteBuilder.java | 26 ++--- .../route/ProfileImportToUnomiRouteBuilder.java | 21 ++-- .../core/route/RouterAbstractRouteBuilder.java | 98 +++++++++++++++++ .../strategy/ArrayListAggregationStrategy.java | 2 - .../StringLinesAggregationStrategy.java | 41 ++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 43 +++++--- .../main/resources/org.apache.unomi.router.cfg | 17 ++- 20 files changed, 608 insertions(+), 285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java index 45de3d6..7f45228 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java @@ -32,13 +32,20 @@ public interface RouterConstants { String IMPORT_EXPORT_CONFIG_TYPE_RECURRENT = "recurrent"; String IMPORT_EXPORT_CONFIG_TYPE_ONESHOT = "oneshot"; - String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer"; + String DIRECT_IMPORT_DEPOSIT_BUFFER = "direct:depositImportBuffer"; + String DIRECT_EXPORT_DEPOSIT_BUFFER = "direct:depositExportBuffer"; String DIRECTION_FROM = "from"; String DIRECTION_TO = "to"; String HEADER_CONFIG_TYPE = "configType"; + String HEADER_EXPORT_CONFIG = "exportConfig"; String HEADER_FAILED_MESSAGE = "failedMessage"; String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot"; + + String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE"; + String DEFAULT_FILE_COLUMN_SEPARATOR = ","; + + String DEFAULT_FILE_LINE_SEPARATOR = "\n"; } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java new file mode 100644 index 0000000..4525019 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.bean; + +import org.apache.unomi.api.Profile; +import org.apache.unomi.persistence.spi.PersistenceService; + +import java.util.List; + +/** + * Created by amidani on 28/06/2017. + */ +public class CollectProfileBean { + + private PersistenceService persistenceService; + + public List<Profile> extractProfileBySegment(String segment) { + return persistenceService.query("segments", segment,null, Profile.class); + } + + public void setPersistenceService(PersistenceService persistenceService) { + this.persistenceService = persistenceService; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java index 32ceba8..d6ca24b 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java @@ -27,13 +27,11 @@ import org.apache.unomi.router.api.ExportConfiguration; import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; -import org.apache.unomi.router.core.processor.RouteCompletionProcessor; +import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor; import org.apache.unomi.router.core.processor.UnomiStorageProcessor; -import org.apache.unomi.router.core.route.ProfileExportCollectRouteBuilder; -import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder; -import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder; -import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder; +import org.apache.unomi.router.core.route.*; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.BundleEvent; @@ -53,7 +51,8 @@ public class RouterCamelContext implements SynchronousBundleListener { private Logger logger = LoggerFactory.getLogger(RouterCamelContext.class.getName()); private CamelContext camelContext; private UnomiStorageProcessor unomiStorageProcessor; - private RouteCompletionProcessor routeCompletionProcessor; + private ImportRouteCompletionProcessor importRouteCompletionProcessor; + private ExportRouteCompletionProcessor exportRouteCompletionProcessor; private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService; @@ -102,19 +101,26 @@ public class RouterCamelContext implements SynchronousBundleListener { //Unomi sink route ProfileImportToUnomiRouteBuilder builderProcessor = new ProfileImportToUnomiRouteBuilder(kafkaProps, configType); builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor); - builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor); + builderProcessor.setImportRouteCompletionProcessor(importRouteCompletionProcessor); builderProcessor.setJacksonDataFormat(jacksonDataFormat); builderProcessor.setContext(camelContext); camelContext.addRoutes(builderProcessor); //--EXPORT ROUTES - ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(); + ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType); profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService); profileExportCollectRouteBuilder.setPersistenceService(persistenceService); profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints); + profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat); profileExportCollectRouteBuilder.setContext(camelContext); camelContext.addRoutes(profileExportCollectRouteBuilder); + ProfileExportProducerRouteBuilder profileExportProducerRouteBuilder = new ProfileExportProducerRouteBuilder(kafkaProps, configType); + profileExportProducerRouteBuilder.setExportRouteCompletionProcessor(exportRouteCompletionProcessor); + profileExportProducerRouteBuilder.setAllowedEndpoints(allowedEndpoints); + profileExportProducerRouteBuilder.setJacksonDataFormat(jacksonDataFormat); + profileExportProducerRouteBuilder.setContext(camelContext); + camelContext.addRoutes(profileExportProducerRouteBuilder); camelContext.start(); @@ -174,10 +180,11 @@ public class RouterCamelContext implements SynchronousBundleListener { killExistingRoute(exportConfiguration.getItemId()); //Handle transforming an import config oneshot <--> recurrent if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) { - ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(); + ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType); profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService); profileExportCollectRouteBuilder.setPersistenceService(persistenceService); profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints); + profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat); profileExportCollectRouteBuilder.setContext(camelContext); camelContext.addRoutes(profileExportCollectRouteBuilder); } @@ -191,8 +198,12 @@ public class RouterCamelContext implements SynchronousBundleListener { this.unomiStorageProcessor = unomiStorageProcessor; } - public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) { - this.routeCompletionProcessor = routeCompletionProcessor; + public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) { + this.importRouteCompletionProcessor = importRouteCompletionProcessor; + } + + public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) { + this.exportRouteCompletionProcessor = exportRouteCompletionProcessor; } public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java index 8e6ab36..76bd8a6 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java @@ -19,7 +19,6 @@ package org.apache.unomi.router.core.processor; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; -import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.core.context.RouterCamelContext; /** http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java new file mode 100644 index 0000000..1b4d1da --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.unomi.router.api.ExportConfiguration; +import org.apache.unomi.router.api.RouterConstants; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +/** + * Created by amidani on 29/06/2017. + */ +public class ExportRouteCompletionProcessor implements Processor { + + private static final Logger logger = LoggerFactory.getLogger(ExportRouteCompletionProcessor.class.getName()); + private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService; + private int executionsHistorySize; + + @Override + public void process(Exchange exchange) throws Exception { + String importConfigId = null; + ExportConfiguration exportConfig = (ExportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_EXPORT_CONFIG); + + Map execution = new HashMap(); + execution.put("date", ((Date) exchange.getProperty("CamelCreatedTimestamp")).getTime()); + execution.put("extractedProfiles", exchange.getProperty("CamelSplitSize")); + + ExportConfiguration exportConfiguration = exportConfigurationService.load(exportConfig.getItemId()); + + if (exportConfiguration.getExecutions().size() >= executionsHistorySize) { + int oldestExecIndex = 0; + long oldestExecDate = (Long) exportConfiguration.getExecutions().get(0).get("date"); + for (int i = 1; i < exportConfiguration.getExecutions().size(); i++) { + if ((Long) exportConfiguration.getExecutions().get(i).get("date") < oldestExecDate) { + oldestExecDate = (Long) exportConfiguration.getExecutions().get(i).get("date"); + oldestExecIndex = i; + } + } + exportConfiguration.getExecutions().remove(oldestExecIndex); + } + + exportConfiguration.getExecutions().add(execution); + exportConfigurationService.save(exportConfiguration); + + logger.info("Processing route {} completed.", exchange.getFromRouteId()); + } + + public void setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration> exportConfigurationService) { + this.exportConfigurationService = exportConfigurationService; + } + + public void setExecutionsHistorySize(int executionsHistorySize) { + this.executionsHistorySize = executionsHistorySize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java new file mode 100644 index 0000000..edb7391 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.api.ImportLineError; +import org.apache.unomi.router.api.ProfileToImport; +import org.apache.unomi.router.api.RouterConstants; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Created by amidani on 14/06/2017. + */ +public class ImportRouteCompletionProcessor implements Processor { + + private static final Logger logger = LoggerFactory.getLogger(ImportRouteCompletionProcessor.class.getName()); + private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; + private int executionsHistorySize; + + @Override + public void process(Exchange exchange) throws Exception { + String importConfigId = null; + ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT); + if (importConfigOneShot != null) { + importConfigId = importConfigOneShot.getItemId(); + } else { + importConfigId = exchange.getFromRouteId(); + } + ImportConfiguration importConfiguration = importConfigurationService.load(importConfigId); + long successCount = 0; + long failureCount = 0; + long ignoreCount = 0; + List<ImportLineError> errors = new ArrayList<ImportLineError>(); + + for (Object line : exchange.getIn().getBody(ArrayList.class)) { + if (line instanceof ProfileToImport) { + successCount++; + } else if (line instanceof ImportLineError) { + failureCount++; + errors.add(((ImportLineError) line)); + } else { + ignoreCount++; + } + } + + Map execution = new HashMap(); + execution.put("date", ((Date) exchange.getProperty("CamelCreatedTimestamp")).getTime()); + execution.put("totalLinesNb", exchange.getProperty("CamelSplitSize")); + execution.put("successCount", successCount); + execution.put("failureCount", failureCount); + execution.put("errors", errors); + + if (importConfiguration.getExecutions().size() >= executionsHistorySize) { + int oldestExecIndex = 0; + long oldestExecDate = (Long) importConfiguration.getExecutions().get(0).get("date"); + for (int i = 1; i < importConfiguration.getExecutions().size(); i++) { + if ((Long) importConfiguration.getExecutions().get(i).get("date") < oldestExecDate) { + oldestExecDate = (Long) importConfiguration.getExecutions().get(i).get("date"); + oldestExecIndex = i; + } + } + importConfiguration.getExecutions().remove(oldestExecIndex); + } + + importConfiguration.getExecutions().add(execution); + //Set running to false, route is complete + if (failureCount > 0 && successCount > 0) { + importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_WITH_ERRORS); + } else if (failureCount > 0 && successCount == 0) { + importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_ERRORS); + } else if (failureCount == 0 && successCount > 0) { + importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_SUCCESS); + } + importConfigurationService.save(importConfiguration); + logger.info("Processing route {} completed.", exchange.getFromRouteId()); + } + + public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { + this.importConfigurationService = importConfigurationService; + } + + public void setExecutionsHistorySize(int executionsHistorySize) { + this.executionsHistorySize = executionsHistorySize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java new file mode 100644 index 0000000..6f83741 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.unomi.api.Profile; +import org.apache.unomi.router.api.ExportConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Created by amidani on 28/06/2017. + */ +public class LineBuildProcessor implements Processor { + + private static final Logger logger = LoggerFactory.getLogger(LineBuildProcessor.class); + + @Override + public void process(Exchange exchange) throws Exception { + ExportConfiguration exportConfiguration = (ExportConfiguration) exchange.getIn().getHeader("exportConfig"); + exchange.getIn().setHeader("destination", exportConfiguration.getProperty("destination")); + Profile profile = exchange.getIn().getBody(Profile.class); + + Map<String, String> mapping = (Map<String, String>) exportConfiguration.getProperty("mapping"); + String lineToWrite = ""; + for (int i = 0; i < mapping.size(); i++) { + String propertyName = mapping.get(String.valueOf(i)); + lineToWrite += profile.getProperty(propertyName) != null ? profile.getProperty(propertyName) : ""; + if (i + 1 < mapping.size()) { + lineToWrite += exportConfiguration.getColumnSeparator(); + } + } + + exchange.getIn().setBody(lineToWrite, String.class); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java deleted file mode 100644 index b522426..0000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.processor; - -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.api.ImportLineError; -import org.apache.unomi.router.api.ProfileToImport; -import org.apache.unomi.router.api.services.ImportExportConfigurationService; -import org.apache.unomi.router.api.RouterConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * Created by amidani on 14/06/2017. - */ -public class RouteCompletionProcessor implements Processor { - - private static final Logger logger = LoggerFactory.getLogger(RouteCompletionProcessor.class.getName()); - private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; - private int executionsHistorySize; - - @Override - public void process(Exchange exchange) throws Exception { - String importConfigId = null; - ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT); - if (importConfigOneShot != null) { - importConfigId = importConfigOneShot.getItemId(); - } else { - importConfigId = exchange.getFromRouteId(); - } - ImportConfiguration importConfiguration = importConfigurationService.load(importConfigId); - long successCount = 0; - long failureCount = 0; - long ignoreCount = 0; - List<ImportLineError> errors = new ArrayList<ImportLineError>(); - - for (Object line : exchange.getIn().getBody(ArrayList.class)) { - if (line instanceof ProfileToImport) { - successCount++; - } else if (line instanceof ImportLineError) { - failureCount++; - errors.add(((ImportLineError) line)); - } else { - ignoreCount++; - } - } - - Map execution = new HashMap(); - execution.put("date", ((Date) exchange.getProperty("CamelCreatedTimestamp")).getTime()); - execution.put("totalLinesNb", exchange.getProperty("CamelSplitSize")); - execution.put("successCount", successCount); - execution.put("failureCount", failureCount); - execution.put("errors", errors); - - if (importConfiguration.getExecutions().size() >= executionsHistorySize) { - int oldestExecIndex = 0; - long oldestExecDate = (Long) importConfiguration.getExecutions().get(0).get("date"); - for (int i = 1; i < importConfiguration.getExecutions().size(); i++) { - if ((Long) importConfiguration.getExecutions().get(i).get("date") < oldestExecDate) { - oldestExecDate = (Long) importConfiguration.getExecutions().get(i).get("date"); - oldestExecIndex = i; - } - } - importConfiguration.getExecutions().remove(oldestExecIndex); - } - - importConfiguration.getExecutions().add(execution); - //Set running to false, route is complete - if (failureCount > 0 && successCount > 0) { - importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_WITH_ERRORS); - } else if (failureCount > 0 && successCount == 0) { - importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_ERRORS); - } else if (failureCount == 0 && successCount > 0) { - importConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_SUCCESS); - } - importConfigurationService.save(importConfiguration); - logger.info("Processing route {} completed.", exchange.getFromRouteId()); - } - - public void setImportConfigurationService(ImportExportConfigurationService<ImportConfiguration> importConfigurationService) { - this.importConfigurationService = importConfigurationService; - } - - public void setExecutionsHistorySize(int executionsHistorySize) { - this.executionsHistorySize = executionsHistorySize; - } -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java index dd70033..885713a 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java @@ -18,7 +18,6 @@ package org.apache.unomi.router.core.route; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.rest.RestBindingMode; -import org.apache.unomi.api.services.ConfigSharingService; import org.apache.unomi.router.api.ExportConfiguration; import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.core.context.RouterCamelContext; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java index 5c3015e..b67859a 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java @@ -16,21 +16,25 @@ */ package org.apache.unomi.router.core.route; -import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.LoggingLevel; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.camel.model.ProcessorDefinition; import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.api.Profile; import org.apache.unomi.persistence.spi.PersistenceService; import org.apache.unomi.router.api.ExportConfiguration; +import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.api.services.ImportExportConfigurationService; +import org.apache.unomi.router.core.bean.CollectProfileBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Map; /** * Created by amidani on 27/06/2017. */ -public class ProfileExportCollectRouteBuilder extends RouteBuilder { +public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger logger = LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class); @@ -38,7 +42,9 @@ public class ProfileExportCollectRouteBuilder extends RouteBuilder { private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService; private PersistenceService persistenceService; - private String allowedEndpoints; + public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps, String configType) { + super(kafkaProps, configType); + } @Override public void configure() throws Exception { @@ -48,16 +54,37 @@ public class ProfileExportCollectRouteBuilder extends RouteBuilder { exportConfigurationList = exportConfigurationService.getAll(); } + CollectProfileBean collectProfileBean = new CollectProfileBean(); + collectProfileBean.setPersistenceService(persistenceService); + + //Loop on multiple export configuration for (final ExportConfiguration exportConfiguration : exportConfigurationList) { - String endpoint = (String) exportConfiguration.getProperties().get("destination"); - - if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) { - List<Profile> profilesCollected = persistenceService.query("segments", (String) exportConfiguration.getProperties().get("segments"), - null, Profile.class); - logger.info("Collected +++{}+++ profiles.", profilesCollected.size()); + if (exportConfiguration.getProperties() != null && exportConfiguration.getProperties().size() > 0) { + if ((Map<String, String>) exportConfiguration.getProperties().get("mapping") != null) { + String destinationEndpoint = (String) exportConfiguration.getProperties().get("destination"); + if (StringUtils.isNotBlank(destinationEndpoint) && allowedEndpoints.contains(destinationEndpoint.substring(0, destinationEndpoint.indexOf(':')))) { + ProcessorDefinition prDef = from("timer://collectProfile?fixedRate=true&period=" + (String) exportConfiguration.getProperties().get("period")) + .autoStartup(exportConfiguration.isActive()) + .bean(collectProfileBean, "extractProfileBySegment(" + exportConfiguration.getProperties().get("segment") + ")") + .split(body()) + .marshal(jacksonDataFormat) + .convertBodyTo(String.class) + .setHeader(RouterConstants.HEADER_EXPORT_CONFIG, constant(exportConfiguration)) + .log(LoggingLevel.DEBUG, "BODY : ${body}"); + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER)); + } else { + prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER)); + } + } else { + logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", destinationEndpoint.substring(0, destinationEndpoint.indexOf(':')), exportConfiguration.getItemId()); + } + } else { + logger.warn("Mapping is null in export configuration, route {} will be skipped!", exportConfiguration.getItemId()); + } } else { - logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), exportConfiguration.getItemId()); + logger.warn("Export configuration incomplete, route {} will be skipped!", exportConfiguration.getItemId()); } } } @@ -74,8 +101,4 @@ public class ProfileExportCollectRouteBuilder extends RouteBuilder { this.persistenceService = persistenceService; } - public void setAllowedEndpoints(String allowedEndpoints) { - this.allowedEndpoints = allowedEndpoints; - } - } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java new file mode 100644 index 0000000..0b0b60a --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.camel.model.RouteDefinition; +import org.apache.unomi.router.api.RouterConstants; +import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor; +import org.apache.unomi.router.core.processor.LineBuildProcessor; +import org.apache.unomi.router.core.strategy.StringLinesAggregationStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Created by amidani on 28/06/2017. + */ +public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ProfileExportProducerRouteBuilder.class); + + private ExportRouteCompletionProcessor exportRouteCompletionProcessor; + + public ProfileExportProducerRouteBuilder(Map<String, String> kafkaProps, String configType) { + super(kafkaProps, configType); + } + + @Override + public void configure() throws Exception { + + logger.info("Configure Recurrent Route 'Export :: Data Producer'"); + + RouteDefinition rtDef; + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER)); + } else { + rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER)); + } + + rtDef.unmarshal(jacksonDataFormat) + .process(new LineBuildProcessor()) + .aggregate(constant(true), new StringLinesAggregationStrategy()) + .completionPredicate(exchangeProperty("CamelSplitSize").isEqualTo(exchangeProperty("CamelAggregatedSize"))) + .eagerCheckCompletion() + .process(exportRouteCompletionProcessor) + .toD("${in.header.exportConfig.getProperty('destination')}"); + + } + + public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) { + this.exportRouteCompletionProcessor = exportRouteCompletionProcessor; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java deleted file mode 100644 index bacc38e..0000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.route; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.jackson.JacksonDataFormat; -import org.apache.camel.component.kafka.KafkaComponent; -import org.apache.camel.component.kafka.KafkaConfiguration; -import org.apache.camel.component.kafka.KafkaEndpoint; -import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.router.api.RouterConstants; - -import java.util.Map; - -/** - * Created by amidani on 13/06/2017. - */ -public abstract class ProfileImportAbstractRouteBuilder extends RouteBuilder { - - protected JacksonDataFormat jacksonDataFormat; - - protected String kafkaHost; - protected String kafkaPort; - protected String kafkaImportTopic; - protected String kafkaImportGroupId; - protected String kafkaImportConsumerCount; - protected String kafkaImportAutoCommit; - - protected String configType; - - public ProfileImportAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) { - this.kafkaHost = kafkaProps.get("kafkaHost"); - this.kafkaPort = kafkaProps.get("kafkaPort"); - this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); - this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); - this.kafkaImportConsumerCount = kafkaProps.get("kafkaImportConsumerCount"); - this.kafkaImportAutoCommit = kafkaProps.get("kafkaImportAutoCommit"); - this.configType = configType; - } - - public Object getEndpointURI(String direction) { - Object endpoint; - if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { - //Prepare Kafka Deposit - StringBuilder kafkaUri = new StringBuilder("kafka:"); - kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); - if (StringUtils.isNotBlank(kafkaImportGroupId)) { - kafkaUri.append("&groupId=" + kafkaImportGroupId); - } - if (RouterConstants.DIRECTION_TO.equals(direction)) { - kafkaUri.append("&autoCommitEnable=" + kafkaImportAutoCommit + "&consumersCount=" + kafkaImportConsumerCount); - } - KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); - kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort); - kafkaConfiguration.setTopic(kafkaImportTopic); - kafkaConfiguration.setGroupId(kafkaImportGroupId); - endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); - ((KafkaEndpoint) endpoint).setConfiguration(kafkaConfiguration); - } else { - endpoint = RouterConstants.DIRECT_DEPOSIT_BUFFER; - } - - return endpoint; - } - - public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { - this.jacksonDataFormat = jacksonDataFormat; - } -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java index 7f54884..2dc87f3 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -23,8 +23,8 @@ import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.camel.model.ProcessorDefinition; import org.apache.commons.lang3.StringUtils; import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.api.services.ImportExportConfigurationService; import org.apache.unomi.router.api.RouterConstants; +import org.apache.unomi.router.api.services.ImportExportConfigurationService; import org.apache.unomi.router.core.exception.BadProfileDataFormatException; import org.apache.unomi.router.core.processor.LineSplitFailureHandler; import org.apache.unomi.router.core.processor.LineSplitProcessor; @@ -39,15 +39,13 @@ import java.util.Map; * Created by amidani on 26/04/2017. */ -public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRouteBuilder { +public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuilder { private static final Logger logger = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName()); private List<ImportConfiguration> importConfigurationList; private ImportExportConfigurationService<ImportConfiguration> importConfigurationService; - private String allowedEndpoints; - public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); } @@ -67,9 +65,9 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo .process(new LineSplitFailureHandler()); if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { - prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } else { - prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } //Loop on multiple import configuration @@ -113,9 +111,9 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo .convertBodyTo(String.class); if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { - prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } else { - prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } } else { logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), importConfiguration.getItemId()); @@ -132,8 +130,4 @@ public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRo this.importConfigurationService = importConfigurationService; } - public void setAllowedEndpoints(String allowedEndpoints) { - this.allowedEndpoints = allowedEndpoints; - } - } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java index a94b5ed..0913876 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -17,7 +17,6 @@ package org.apache.unomi.router.core.route; import org.apache.camel.LoggingLevel; -import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.camel.model.ProcessorDefinition; import org.apache.unomi.router.api.RouterConstants; @@ -33,15 +32,12 @@ import java.util.Map; /** * Created by amidani on 22/05/2017. */ -public class ProfileImportOneShotRouteBuilder extends ProfileImportAbstractRouteBuilder { +public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder { private Logger logger = LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName()); - private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; private String uploadDir; - private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE"; - public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); } @@ -57,27 +53,27 @@ public class ProfileImportOneShotRouteBuilder extends ProfileImportAbstractRoute .process(new LineSplitFailureHandler()); if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { - prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } else { - prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); - ProcessorDefinition prDef = from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m") - .routeId(IMPORT_ONESHOT_ROUTE_ID) + ProcessorDefinition prDef = from("file://" + uploadDir + "?include=.*.csv&consumer.delay=1m") + .routeId(RouterConstants.IMPORT_ONESHOT_ROUTE_ID) .autoStartup(true) .process(importConfigByFileNameProcessor) .split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}")) - .setHeader("configType", constant(configType)) + .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType)) .process(lineSplitProcessor) .to("log:org.apache.unomi.router?level=INFO") .marshal(jacksonDataFormat) .convertBodyTo(String.class); - if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){ - prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } else { - prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } } @@ -88,8 +84,4 @@ public class ProfileImportOneShotRouteBuilder extends ProfileImportAbstractRoute public void setUploadDir(String uploadDir) { this.uploadDir = uploadDir; } - - public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { - this.jacksonDataFormat = jacksonDataFormat; - } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java index d75977b..759dde4 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java @@ -17,11 +17,10 @@ package org.apache.unomi.router.core.route; import org.apache.camel.LoggingLevel; -import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.component.kafka.KafkaEndpoint; import org.apache.camel.model.RouteDefinition; import org.apache.unomi.router.api.RouterConstants; -import org.apache.unomi.router.core.processor.RouteCompletionProcessor; +import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor; import org.apache.unomi.router.core.processor.UnomiStorageProcessor; import org.apache.unomi.router.core.strategy.ArrayListAggregationStrategy; import org.slf4j.Logger; @@ -32,12 +31,12 @@ import java.util.Map; /** * Created by amidani on 26/04/2017. */ -public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRouteBuilder { +public class ProfileImportToUnomiRouteBuilder extends RouterAbstractRouteBuilder { private Logger logger = LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName()); private UnomiStorageProcessor unomiStorageProcessor; - private RouteCompletionProcessor routeCompletionProcessor; + private ImportRouteCompletionProcessor importRouteCompletionProcessor; public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, String configType) { super(kafkaProps, configType); @@ -50,9 +49,9 @@ public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRoute RouteDefinition rtDef; if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { - rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO)); + rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } else { - rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO)); + rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER)); } rtDef.choice() .when(header(RouterConstants.HEADER_FAILED_MESSAGE).isNull()) @@ -64,7 +63,7 @@ public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRoute .aggregate(constant(true), new ArrayListAggregationStrategy()) .completionPredicate(exchangeProperty("CamelSplitComplete").isEqualTo("true")) .eagerCheckCompletion() - .process(routeCompletionProcessor) + .process(importRouteCompletionProcessor) .to("log:org.apache.unomi.router?level=INFO"); } @@ -72,11 +71,7 @@ public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRoute this.unomiStorageProcessor = unomiStorageProcessor; } - public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) { - this.routeCompletionProcessor = routeCompletionProcessor; - } - - public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { - this.jacksonDataFormat = jacksonDataFormat; + public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) { + this.importRouteCompletionProcessor = importRouteCompletionProcessor; } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java new file mode 100644 index 0000000..5db9917 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.api.RouterConstants; + +import java.util.Map; + +/** + * Created by amidani on 13/06/2017. + */ +public abstract class RouterAbstractRouteBuilder extends RouteBuilder { + + protected JacksonDataFormat jacksonDataFormat; + + protected String kafkaHost; + protected String kafkaPort; + protected String kafkaImportTopic; + protected String kafkaExportTopic; + protected String kafkaImportGroupId; + protected String kafkaExportGroupId; + protected String kafkaConsumerCount; + protected String kafkaAutoCommit; + + protected String configType; + protected String allowedEndpoints; + + public RouterAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) { + this.kafkaHost = kafkaProps.get("kafkaHost"); + this.kafkaPort = kafkaProps.get("kafkaPort"); + this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); + this.kafkaExportTopic = kafkaProps.get("kafkaExportTopic"); + this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); + this.kafkaExportGroupId = kafkaProps.get("kafkaExportGroupId"); + this.kafkaConsumerCount = kafkaProps.get("kafkaConsumerCount"); + this.kafkaAutoCommit = kafkaProps.get("kafkaAutoCommit"); + this.configType = configType; + } + + public Object getEndpointURI(String direction, String operationDepositBuffer) { + Object endpoint; + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + String kafkaTopic = kafkaImportTopic; + String kafkaGroupId = kafkaImportGroupId; + if (RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER.equals(operationDepositBuffer)) { + kafkaTopic = kafkaExportTopic; + kafkaGroupId = kafkaExportGroupId; + } + //Prepare Kafka Deposit + StringBuilder kafkaUri = new StringBuilder("kafka:"); + kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaTopic); + if (StringUtils.isNotBlank(kafkaGroupId)) { + kafkaUri.append("&groupId=" + kafkaGroupId); + } + if (RouterConstants.DIRECTION_TO.equals(direction)) { + kafkaUri.append("&autoCommitEnable=" + kafkaAutoCommit + "&consumersCount=" + kafkaConsumerCount); + } + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort); + kafkaConfiguration.setTopic(kafkaTopic); + kafkaConfiguration.setGroupId(kafkaGroupId); + endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); + ((KafkaEndpoint) endpoint).setConfiguration(kafkaConfiguration); + } else { + endpoint = operationDepositBuffer; + } + + return endpoint; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } + + public void setAllowedEndpoints(String allowedEndpoints) { + this.allowedEndpoints = allowedEndpoints; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java index a53e34b..ca87ad3 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java @@ -18,8 +18,6 @@ package org.apache.unomi.router.core.strategy; import org.apache.camel.Exchange; import org.apache.camel.processor.aggregate.AggregationStrategy; -import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; -import org.apache.unomi.router.core.processor.RouteCompletionProcessor; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java new file mode 100644 index 0000000..5a69001 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.strategy; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.unomi.router.api.ExportConfiguration; + +/** + * Created by amidani on 29/06/2017. + */ +public class StringLinesAggregationStrategy implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + Object newBody = newExchange.getIn().getBody(String.class); + String lineSeparator = newExchange.getIn().getHeader("exportConfig", ExportConfiguration.class).getLineSeparator(); + if (oldExchange != null) { + String fileContent = oldExchange.getIn().getBody(String.class); + + fileContent += lineSeparator + newBody; + oldExchange.getIn().setBody(fileContent); + return oldExchange; + } else { + return newExchange; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 5ae1e9c..3b155b2 100644 --- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -27,15 +27,16 @@ <cm:default-properties> <cm:property name="config.type" value="nobroker"/> <cm:property name="config.allowedEndpoints" value="file,ftp"/> - <cm:property name="config.internalPort" value="8233"/> <cm:property name="kafka.host" value="localhost"/> <cm:property name="kafka.port" value="9092"/> - <cm:property name="kafka.import.topic" value="camel-deposit"/> + <cm:property name="kafka.import.topic" value="import-deposit"/> + <cm:property name="kafka.export.topic" value="export-deposit"/> <cm:property name="kafka.import.groupId" value="unomi-import-group"/> - <cm:property name="kafka.import.consumerCount" value="10"/> - <cm:property name="kafka.import.autoCommit" value="true"/> + <cm:property name="kafka.export.groupId" value="unomi-export-group"/> + <cm:property name="kafka.consumerCount" value="10"/> + <cm:property name="kafka.autoCommit" value="true"/> <cm:property name="import.oneshot.uploadDir" value="/tmp/oneshot_import_configs/"/> - <cm:property name="import.executionsHistory.size" value="5"/> + <cm:property name="executionsHistory.size" value="5"/> </cm:default-properties> </cm:property-placeholder> @@ -43,9 +44,14 @@ <property name="profileImportService" ref="profileImportService"/> </bean> - <bean id="routeCompletionProcessor" class="org.apache.unomi.router.core.processor.RouteCompletionProcessor"> + <bean id="importRouteCompletionProcessor" class="org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor"> <property name="importConfigurationService" ref="importConfigurationService"/> - <property name="executionsHistorySize" value="${import.executionsHistory.size}"/> + <property name="executionsHistorySize" value="${executionsHistory.size}"/> + </bean> + + <bean id="exportRouteCompletionProcessor" class="org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor"> + <property name="exportConfigurationService" ref="exportConfigurationService"/> + <property name="executionsHistorySize" value="${executionsHistory.size}"/> </bean> <bean id="importConfigByFileNameProcessor" class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor"> @@ -86,26 +92,29 @@ init-method="initCamelContext" destroy-method="preDestroy"> <property name="configType" value="${config.type}"/> <property name="allowedEndpoints" value="${config.allowedEndpoints}"/> + <property name="uploadDir" value="${import.oneshot.uploadDir}"/> + <property name="bundleContext" ref="blueprintBundleContext"/> + <property name="jacksonDataFormat" ref="jacksonDataFormat"/> <property name="kafkaProps"> <map> <entry key="kafkaHost" value="${kafka.host}"/> <entry key="kafkaPort" value="${kafka.port}"/> <entry key="kafkaImportTopic" value="${kafka.import.topic}"/> + <entry key="kafkaExportTopic" value="${kafka.export.topic}"/> <entry key="kafkaImportGroupId" value="${kafka.import.groupId}"/> - <entry key="kafkaImportConsumerCount" value="${kafka.import.consumerCount}"/> - <entry key="kafkaImportAutoCommit" value="${kafka.import.autoCommit}"/> + <entry key="kafkaExportGroupId" value="${kafka.export.groupId}"/> + <entry key="kafkaConsumerCount" value="${kafka.consumerCount}"/> + <entry key="kafkaAutoCommit" value="${kafka.autoCommit}"/> </map> </property> - <property name="uploadDir" value="${import.oneshot.uploadDir}"/> <property name="unomiStorageProcessor" ref="unomiStorageProcessor"/> - <property name="routeCompletionProcessor" ref="routeCompletionProcessor"/> + <property name="importRouteCompletionProcessor" ref="importRouteCompletionProcessor"/> + <property name="exportRouteCompletionProcessor" ref="exportRouteCompletionProcessor"/> <property name="importConfigByFileNameProcessor" ref="importConfigByFileNameProcessor"/> - <property name="importConfigurationService" ref="importConfigurationService"/> + <property name="configSharingService" ref="configSharingService" /> <property name="exportConfigurationService" ref="exportConfigurationService"/> + <property name="importConfigurationService" ref="importConfigurationService"/> <property name="persistenceService" ref="persistenceService"/> - <property name="jacksonDataFormat" ref="jacksonDataFormat"/> - <property name="bundleContext" ref="blueprintBundleContext"/> - <property name="configSharingService" ref="configSharingService" /> </bean> <camel:camelContext id="httpEndpoint" xmlns="http://camel.apache.org/schema/blueprint"> @@ -116,6 +125,10 @@ <property name="routerCamelContext" ref="camelContext"/> </bean> + <bean id="collectProfileBean" class="org.apache.unomi.router.core.bean.CollectProfileBean"> + <property name="persistenceService" ref="persistenceService"/> + </bean> + <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" /> <reference id="httpService" interface="org.osgi.service.http.HttpService"/> <reference id="profileImportService" interface="org.apache.unomi.router.api.services.ProfileImportService"/> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/b7194f47/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg index 2aa385f..8f29d65 100644 --- a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg +++ b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg @@ -23,19 +23,18 @@ import.config.type=nobroker #Kafka #kafka.host=localhost #kafka.port=9092 -#kafka.import.topic=camel-deposit +#kafka.import.topic=import-deposit +#kafka.export.topic=export-deposit #kafka.import.groupId=unomi-import-group -#kafka.import.consumerCount=10 -#kafka.import.autoCommit=true +#kafka.export.groupId=unomi-import-group +#kafka.consumerCount=10 +#kafka.autoCommit=true #Import One Shot upload directory import.oneshot.uploadDir=${karaf.data}/tmp/unomi_oneshot_import_configs/ -#Import executions history size -import.executionsHistory.size=5 +#Import/Export executions history size +executionsHistory.size=5 #Allowed source endpoints -config.allowedEndpoints=file,ftp - -#Internal Camel REST services port (Not public - DO NOT OPEN TO PUBLIC) -config.internalPort=8233 \ No newline at end of file +config.allowedEndpoints=file,ftp \ No newline at end of file