Repository: incubator-unomi Updated Branches: refs/heads/master f84401a7a -> bd34ae9e0
UNOMI-153 add event when updating camel route (remove, add or update) to make sure route are synchronize in the cluster Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/bd34ae9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/bd34ae9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/bd34ae9e Branch: refs/heads/master Commit: bd34ae9e0d3e6101a9aeb5a0988b3c0c099bbf97 Parents: f84401a Author: dgaillard <[email protected]> Authored: Mon Feb 26 17:48:46 2018 +0100 Committer: dgaillard <[email protected]> Committed: Mon Feb 26 17:48:46 2018 +0100 ---------------------------------------------------------------------- .../unomi/api/services/ClusterService.java | 8 +++ .../unomi/router/api/IRouterCamelContext.java | 4 +- extensions/router/router-core/pom.xml | 10 +++ .../router/core/context/RouterCamelContext.java | 42 ++++++++--- .../core/event/UpdateCamelRouteEvent.java | 47 +++++++++++++ .../event/UpdateCamelRouteEventHandler.java | 74 ++++++++++++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 17 +++++ .../ExportConfigurationServiceImpl.java | 4 +- .../ImportConfigurationServiceImpl.java | 4 +- .../services/services/ClusterServiceImpl.java | 9 +++ 10 files changed, 204 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/api/src/main/java/org/apache/unomi/api/services/ClusterService.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java index b851b78..9a0fdfa 100644 --- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java +++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java @@ -19,6 +19,7 @@ package org.apache.unomi.api.services; import org.apache.unomi.api.ClusterNode; +import java.io.Serializable; import java.util.Date; import java.util.List; @@ -49,4 +50,11 @@ public interface ClusterService { */ void purge(final String scope); + /** + * This function will send an event to the nodes of the cluster + * The function takes a Serializable to avoid dependency on any clustering framework + * + * @param event this object will be cast to {@link org.apache.karaf.cellar.core.event.Event} + */ + void sendEvent(Serializable event); } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java index d2d3249..8775b43 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java @@ -21,7 +21,7 @@ package org.apache.unomi.router.api; */ public interface IRouterCamelContext { - void killExistingRoute(String routeId) throws Exception; + void killExistingRoute(String routeId, boolean fireEvent) throws Exception; - void updateProfileReaderRoute(Object configuration) throws Exception; + void updateProfileReaderRoute(Object configuration, boolean fireEvent) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/pom.xml b/extensions/router/router-core/pom.xml index 42e10dc..d393e3b 100644 --- a/extensions/router/router-core/pom.xml +++ b/extensions/router/router-core/pom.xml @@ -137,6 +137,16 @@ <version>0.10.1.0</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.karaf.cellar</groupId> + <artifactId>org.apache.karaf.cellar.core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.karaf.cellar</groupId> + <artifactId>org.apache.karaf.cellar.config</artifactId> + <scope>provided</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java index 3b18803..03b2e04 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java @@ -21,6 +21,7 @@ import org.apache.camel.Route; import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.core.osgi.OsgiDefaultCamelContext; import org.apache.camel.model.RouteDefinition; +import org.apache.unomi.api.services.ClusterService; import org.apache.unomi.api.services.ConfigSharingService; import org.apache.unomi.api.services.ProfileService; import org.apache.unomi.persistence.spi.PersistenceService; @@ -30,6 +31,7 @@ import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.RouterConstants; import org.apache.unomi.router.api.services.ImportExportConfigurationService; import org.apache.unomi.router.api.services.ProfileExportService; +import org.apache.unomi.router.core.event.UpdateCamelRouteEvent; import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor; @@ -70,6 +72,7 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam private String allowedEndpoints; private BundleContext bundleContext; private ConfigSharingService configSharingService; + private ClusterService clusterService; public void setExecHistorySize(String execHistorySize) { this.execHistorySize = execHistorySize; @@ -87,6 +90,10 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam this.configSharingService = configSharingService; } + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + public void initCamelContext() throws Exception { logger.info("Initialize Camel Context..."); @@ -160,10 +167,9 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam exportConfigurationService.setRouterCamelContext(this); logger.info("Camel Context {} initialized successfully."); - } - public void killExistingRoute(String routeId) throws Exception { + public void killExistingRoute(String routeId, boolean fireEvent) throws Exception { //Active routes Route route = camelContext.getRoute(routeId); if (route != null) { @@ -172,18 +178,24 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam camelContext.removeRouteDefinition(routeDefinition); } } + + if (fireEvent) { + UpdateCamelRouteEvent event = new UpdateCamelRouteEvent("org.apache.unomi.router.event.remove"); + event.setRouteId(routeId); + clusterService.sendEvent(event); + } } - public void updateProfileReaderRoute(Object configuration) throws Exception { + public void updateProfileReaderRoute(Object configuration, boolean fireEvent) throws Exception { if (configuration instanceof ImportConfiguration) { - updateProfileImportReaderRoute((ImportConfiguration) configuration); + updateProfileImportReaderRoute((ImportConfiguration) configuration, fireEvent); } else { - updateProfileExportReaderRoute((ExportConfiguration) configuration); + updateProfileExportReaderRoute((ExportConfiguration) configuration, fireEvent); } } - private void updateProfileImportReaderRoute(ImportConfiguration importConfiguration) throws Exception { - killExistingRoute(importConfiguration.getItemId()); + private void updateProfileImportReaderRoute(ImportConfiguration importConfiguration, boolean fireEvent) throws Exception { + killExistingRoute(importConfiguration.getItemId(), false); //Handle transforming an import config oneshot <--> recurrent if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())) { ProfileImportFromSourceRouteBuilder builder = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); @@ -194,11 +206,17 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam builder.setJacksonDataFormat(jacksonDataFormat); builder.setContext(camelContext); camelContext.addRoutes(builder); + + if (fireEvent) { + UpdateCamelRouteEvent event = new UpdateCamelRouteEvent("org.apache.unomi.router.event.import"); + event.setConfiguration(importConfiguration); + clusterService.sendEvent(event); + } } } - private void updateProfileExportReaderRoute(ExportConfiguration exportConfiguration) throws Exception { - killExistingRoute(exportConfiguration.getItemId()); + private void updateProfileExportReaderRoute(ExportConfiguration exportConfiguration, boolean fireEvent) throws Exception { + killExistingRoute(exportConfiguration.getItemId(), false); //Handle transforming an import config oneshot <--> recurrent if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) { ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType); @@ -209,6 +227,12 @@ public class RouterCamelContext implements SynchronousBundleListener, IRouterCam profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat); profileExportCollectRouteBuilder.setContext(camelContext); camelContext.addRoutes(profileExportCollectRouteBuilder); + + if (fireEvent) { + UpdateCamelRouteEvent event = new UpdateCamelRouteEvent("org.apache.unomi.router.event.export"); + event.setConfiguration(exportConfiguration); + clusterService.sendEvent(event); + } } } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java new file mode 100644 index 0000000..7e1dc81 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.event; + +import org.apache.karaf.cellar.core.event.Event; + +/** + * @author dgaillard + */ +public class UpdateCamelRouteEvent extends Event { + private String routeId; + private Object configuration; + + public UpdateCamelRouteEvent(String id) { + super(id); + } + + public String getRouteId() { + return routeId; + } + + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + public Object getConfiguration() { + return configuration; + } + + public void setConfiguration(Object configuration) { + this.configuration = configuration; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java new file mode 100644 index 0000000..6760f4c --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.event; + +import org.apache.commons.lang3.StringUtils; +import org.apache.karaf.cellar.config.Constants; +import org.apache.karaf.cellar.core.CellarSupport; +import org.apache.karaf.cellar.core.control.Switch; +import org.apache.karaf.cellar.core.event.EventHandler; +import org.apache.karaf.cellar.core.event.EventType; +import org.apache.unomi.router.core.context.RouterCamelContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author dgaillard + */ +public class UpdateCamelRouteEventHandler extends CellarSupport implements EventHandler<UpdateCamelRouteEvent> { + private static final Logger logger = LoggerFactory.getLogger(UpdateCamelRouteEventHandler.class.getName()); + + private RouterCamelContext routerCamelContext; + + @Override + public void handle(UpdateCamelRouteEvent event) { + logger.debug("Handle event"); + if (isAllowed(event.getSourceGroup(), Constants.CATEGORY, event.getId(), EventType.INBOUND)) { + logger.debug("Event is allowed"); + // check if it's not a "local" event + if (event.getSourceNode() != null && event.getSourceNode().getId().equalsIgnoreCase(clusterManager.getNode().getId())) { + logger.debug("Cluster event is local (coming from local synchronizer or listener)"); + return; + } + + try { + logger.debug("Event id is {}", event.getId()); + if (event.getId().equals("org.apache.unomi.router.event.remove") && StringUtils.isNotBlank(event.getRouteId())) { + routerCamelContext.killExistingRoute(event.getRouteId(), false); + } else if ((event.getId().equals("org.apache.unomi.router.event.import") || event.getId().equals("org.apache.unomi.router.event.export")) && event.getConfiguration() != null) { + routerCamelContext.updateProfileReaderRoute(event.getConfiguration(), false); + } + } catch (Exception e) { + logger.error("Error when executing event", e); + } + } + } + + @Override + public Class<UpdateCamelRouteEvent> getType() { + return UpdateCamelRouteEvent.class; + } + + @Override + public Switch getSwitch() { + return null; + } + + public void setRouterCamelContext(RouterCamelContext routerCamelContext) { + this.routerCamelContext = routerCamelContext; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 3f81b5a..d61d64f 100644 --- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -112,12 +112,25 @@ <property name="persistenceService" ref="persistenceService"/> <property name="profileExportService" ref="profileExportService"/> <property name="profileService" ref="profileService"/> + <property name="clusterService" ref="clusterService" /> </bean> <bean id="collectProfileBean" class="org.apache.unomi.router.core.bean.CollectProfileBean"> <property name="persistenceService" ref="persistenceService"/> </bean> + <bean id="updateCamelRouteEventHandler" class="org.apache.unomi.router.core.event.UpdateCamelRouteEventHandler"> + <property name="configurationAdmin" ref="osgiConfigurationAdmin"/> + <property name="clusterManager" ref="karafCellarClusterManager"/> + <property name="groupManager" ref="karafCellarGroupManager"/> + <property name="routerCamelContext" ref="camelContext"/> + </bean> + <service ref="updateCamelRouteEventHandler" interface="org.apache.karaf.cellar.core.event.EventHandler"> + <service-properties> + <entry key="managed" value="true"/> + </service-properties> + </service> + <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" /> <reference id="profileImportService" interface="org.apache.unomi.router.api.services.ProfileImportService"/> <reference id="profileExportService" interface="org.apache.unomi.router.api.services.ProfileExportService"/> @@ -126,5 +139,9 @@ <reference id="segmentService" interface="org.apache.unomi.api.services.SegmentService"/> <reference id="importConfigurationService" interface="org.apache.unomi.router.api.services.ImportExportConfigurationService" filter="(configDiscriminator=IMPORT)"/> <reference id="exportConfigurationService" interface="org.apache.unomi.router.api.services.ImportExportConfigurationService" filter="(configDiscriminator=EXPORT)"/> + <reference id="clusterService" interface="org.apache.unomi.api.services.ClusterService" /> + <reference id="karafCellarGroupManager" interface="org.apache.karaf.cellar.core.GroupManager" /> + <reference id="osgiConfigurationAdmin" interface="org.osgi.service.cm.ConfigurationAdmin"/> + <reference id="karafCellarClusterManager" interface="org.apache.karaf.cellar.core.ClusterManager" /> </blueprint> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java index 101c2f3..2378717 100644 --- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java +++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java @@ -53,7 +53,7 @@ public class ExportConfigurationServiceImpl extends AbstractConfigurationService } if(updateRunningRoute) { try { - routerCamelContext.updateProfileReaderRoute(exportConfiguration); + routerCamelContext.updateProfileReaderRoute(exportConfiguration, true); } catch (Exception e) { logger.error("Error when trying to save/update running Apache Camel Route: {}", exportConfiguration.getItemId()); } @@ -65,7 +65,7 @@ public class ExportConfigurationServiceImpl extends AbstractConfigurationService @Override public void delete(String configId) { try { - routerCamelContext.killExistingRoute(configId); + routerCamelContext.killExistingRoute(configId, true); } catch (Exception e) { logger.error("Error when trying to delete running Apache Camel Route: {}", configId); } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java index 0813f05..364ea73 100644 --- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java +++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java @@ -53,7 +53,7 @@ public class ImportConfigurationServiceImpl extends AbstractConfigurationService } if(updateRunningRoute) { try { - routerCamelContext.updateProfileReaderRoute(importConfiguration); + routerCamelContext.updateProfileReaderRoute(importConfiguration, true); } catch (Exception e) { logger.error("Error when trying to save/update running Apache Camel Route: {}", importConfiguration.getItemId()); } @@ -65,7 +65,7 @@ public class ImportConfigurationServiceImpl extends AbstractConfigurationService @Override public void delete(String configId) { try { - routerCamelContext.killExistingRoute(configId); + routerCamelContext.killExistingRoute(configId, true); } catch (Exception e) { logger.error("Error when trying to delete running Apache Camel Route: {}", configId); } http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/bd34ae9e/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java ---------------------------------------------------------------------- diff --git a/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java b/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java index 6bc0cdc..163812d 100644 --- a/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java +++ b/services/src/main/java/org/apache/unomi/services/services/ClusterServiceImpl.java @@ -22,6 +22,7 @@ import org.apache.karaf.cellar.config.ClusterConfigurationEvent; import org.apache.karaf.cellar.config.Constants; import org.apache.karaf.cellar.core.*; import org.apache.karaf.cellar.core.control.SwitchStatus; +import org.apache.karaf.cellar.core.event.Event; import org.apache.karaf.cellar.core.event.EventProducer; import org.apache.karaf.cellar.core.event.EventType; import org.apache.unomi.api.ClusterNode; @@ -224,6 +225,14 @@ public class ClusterServiceImpl implements ClusterService { persistenceService.purge(scope); } + @Override + public void sendEvent(Serializable eventObject) { + Event event = (Event) eventObject; + event.setSourceGroup(group); + event.setSourceNode(karafCellarClusterManager.getNode()); + karafCellarEventProducer.produce(event); + } + /** * Check if a configuration is allowed. *
