This is an automated email from the ASF dual-hosted git repository.
shuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/master by this push:
new e7be881e5 UNOMI-877: Remove all reference to cellar and hazelcast.
(#725)
e7be881e5 is described below
commit e7be881e535ab10819c28ed850f5b1bccc3d6c05
Author: Serge Huber <[email protected]>
AuthorDate: Wed Aug 27 07:12:48 2025 +0200
UNOMI-877: Remove all reference to cellar and hazelcast. (#725)
* UNOMI-877: Remove all reference to cellar and hazelcast.
UNOMI-877: Not sending event to cluster anymore.
UNOMI-877: Replace Karaf Cellar and Hazelcast with PersistenceService for
cluster synchronization (code isolated from branch unomi-3-dev made by Serge
Huber)
* Remove unused imports from ClusterNode, RouterCamelContext, and
ClusterService.
* Remove unused `clusterService` property from blueprint.xml.
* Add `bundleWatcher` reference and properties to blueprint.xml.
* Add clusterNode mapping definition for Elasticsearch.
* Reorder `shutdownNow` to ensure proper node removal from
PersistenceService during shutdown.
* Clarify comment on node removal timing in `destroy` method.
* Refactor configuration handling and remove unused OSGi ServiceTracker
references.
* Cache cluster nodes to optimize retrieval and reduce reliance on
PersistenceService.
---
.../java/org/apache/unomi/api/ClusterNode.java | 67 ++-
.../apache/unomi/api/services/ClusterService.java | 8 -
extensions/router/router-core/pom.xml | 10 -
.../router/core/context/RouterCamelContext.java | 23 -
.../router/core/event/UpdateCamelRouteEvent.java | 38 --
.../core/event/UpdateCamelRouteEventHandler.java | 76 ---
.../resources/OSGI-INF/blueprint/blueprint.xml | 17 +-
.../test/java/org/apache/unomi/itests/BaseIT.java | 53 +-
.../org/apache/unomi/itests/ProfileServiceIT.java | 4 +-
.../org/apache/unomi/itests/RuleServiceIT.java | 4 +-
kar/pom.xml | 7 -
kar/src/main/feature/feature.xml | 3 -
.../src/main/asciidoc/building-and-deploying.adoc | 3 +-
manual/src/main/asciidoc/clustering.adoc | 22 +-
package/pom.xml | 19 -
.../main/resources/etc/custom.system.properties | 13 +-
.../etc/org.apache.karaf.cellar.groups.cfg | 81 ---
persistence-elasticsearch/core/pom.xml | 7 -
.../ElasticSearchPersistenceServiceImpl.java | 37 +-
.../META-INF/cxs/mappings/clusterNode.json | 67 +++
.../resources/OSGI-INF/blueprint/blueprint.xml | 6 +-
.../spi/config/ConfigurationUpdateHelper.java | 160 ++++++
pom.xml | 19 -
services/pom.xml | 22 +-
.../services/impl/cluster/ClusterServiceImpl.java | 608 ++++++++++++++-------
.../impl/cluster/ClusterSystemStatisticsEvent.java | 43 --
.../ClusterSystemStatisticsEventHandler.java | 138 -----
.../services/impl/rules/RulesServiceImpl.java | 18 +-
.../resources/OSGI-INF/blueprint/blueprint.xml | 40 +-
services/src/main/resources/hazelcast.xml | 222 --------
.../main/resources/org.apache.unomi.cluster.cfg | 10 +-
31 files changed, 837 insertions(+), 1008 deletions(-)
diff --git a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
index 6c40ca21b..66a4ee7c4 100644
--- a/api/src/main/java/org/apache/unomi/api/ClusterNode.java
+++ b/api/src/main/java/org/apache/unomi/api/ClusterNode.java
@@ -17,15 +17,15 @@
package org.apache.unomi.api;
-import java.io.Serializable;
-
/**
* Information about a cluster node.
*/
-public class ClusterNode implements Serializable {
+public class ClusterNode extends Item {
private static final long serialVersionUID = 1281422346318230514L;
+ public static final String ITEM_TYPE = "clusterNode";
+
private double cpuLoad;
private double[] loadAverage;
private String publicHostAddress;
@@ -33,11 +33,18 @@ public class ClusterNode implements Serializable {
private long uptime;
private boolean master;
private boolean data;
+ private long startTime;
+ private long lastHeartbeat;
+
+ // Server information
+ private ServerInfo serverInfo;
/**
* Instantiates a new Cluster node.
*/
public ClusterNode() {
+ super();
+ setItemType(ITEM_TYPE);
}
/**
@@ -165,4 +172,58 @@ public class ClusterNode implements Serializable {
public void setData(boolean data) {
this.data = data;
}
+
+ /**
+ * Retrieves the node start time in milliseconds.
+ *
+ * @return the start time
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Sets the node start time in milliseconds.
+ *
+ * @param startTime the start time
+ */
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * Retrieves the last heartbeat time in milliseconds.
+ *
+ * @return the last heartbeat time
+ */
+ public long getLastHeartbeat() {
+ return lastHeartbeat;
+ }
+
+ /**
+ * Sets the last heartbeat time in milliseconds.
+ *
+ * @param lastHeartbeat the last heartbeat time
+ */
+ public void setLastHeartbeat(long lastHeartbeat) {
+ this.lastHeartbeat = lastHeartbeat;
+ }
+
+ /**
+ * Gets the server information.
+ *
+ * @return the server information
+ */
+ public ServerInfo getServerInfo() {
+ return serverInfo;
+ }
+
+ /**
+ * Sets the server information.
+ *
+ * @param serverInfo the server information
+ */
+ public void setServerInfo(ServerInfo serverInfo) {
+ this.serverInfo = serverInfo;
+ }
}
diff --git
a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
index 299ac9098..f100be286 100644
--- a/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
+++ b/api/src/main/java/org/apache/unomi/api/services/ClusterService.java
@@ -19,7 +19,6 @@ package org.apache.unomi.api.services;
import org.apache.unomi.api.ClusterNode;
-import java.io.Serializable;
import java.util.Date;
import java.util.List;
@@ -51,11 +50,4 @@ public interface ClusterService {
*/
void purge(final String scope);
- /**
- * This function will send an event to the nodes of the cluster
- * The function takes a Serializable to avoid dependency on any clustering
framework
- *
- * @param event this object will be cast to a
org.apache.karaf.cellar.core.event.Event object
- */
- void sendEvent(Serializable event);
}
diff --git a/extensions/router/router-core/pom.xml
b/extensions/router/router-core/pom.xml
index a8ce97fd4..8b7d46aad 100644
--- a/extensions/router/router-core/pom.xml
+++ b/extensions/router/router-core/pom.xml
@@ -133,16 +133,6 @@
<version>${kafka.client.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.config</artifactId>
- <scope>provided</scope>
- </dependency>
</dependencies>
<build>
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 67219f9c5..87d275456 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -21,7 +21,6 @@ import org.apache.camel.Route;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
import org.apache.camel.model.RouteDefinition;
-import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.api.services.ConfigSharingService;
import org.apache.unomi.api.services.ProfileService;
import org.apache.unomi.persistence.spi.PersistenceService;
@@ -31,7 +30,6 @@ import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.router.api.services.ProfileExportService;
-import org.apache.unomi.router.core.event.UpdateCamelRouteEvent;
import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
@@ -75,7 +73,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
private String allowedEndpoints;
private BundleContext bundleContext;
private ConfigSharingService configSharingService;
- private ClusterService clusterService;
// TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the
custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service
private ScheduledExecutorService scheduler;
@@ -102,10 +99,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
this.configSharingService = configSharingService;
}
- public void setClusterService(ClusterService clusterService) {
- this.clusterService = clusterService;
- }
-
public void setTracing(boolean tracing) {
camelContext.setTracing(true);
}
@@ -240,12 +233,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
camelContext.removeRouteDefinition(routeDefinition);
}
}
-
- if (fireEvent) {
- UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_REMOVE);
- event.setRouteId(routeId);
- clusterService.sendEvent(event);
- }
}
public void updateProfileImportReaderRoute(String configId, boolean
fireEvent) throws Exception {
@@ -266,11 +253,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
builder.setJacksonDataFormat(jacksonDataFormat);
builder.setContext(camelContext);
camelContext.addRoutes(builder);
-
- if (fireEvent) {
- UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_IMPORT);
- clusterService.sendEvent(event);
- }
}
}
@@ -291,11 +273,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
profileExportCollectRouteBuilder.setContext(camelContext);
camelContext.addRoutes(profileExportCollectRouteBuilder);
-
- if (fireEvent) {
- UpdateCamelRouteEvent event = new
UpdateCamelRouteEvent(EVENT_ID_EXPORT);
- clusterService.sendEvent(event);
- }
}
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
deleted file mode 100644
index 2f3d2cb3f..000000000
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.core.event;
-
-import org.apache.karaf.cellar.core.event.Event;
-
-/**
- * @author dgaillard
- */
-public class UpdateCamelRouteEvent extends Event {
- private String routeId;
-
- public UpdateCamelRouteEvent(String id) {
- super(id);
- }
-
- public String getRouteId() {
- return routeId;
- }
-
- public void setRouteId(String routeId) {
- this.routeId = routeId;
- }
-}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
deleted file mode 100644
index f43f2b629..000000000
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.router.core.event;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.karaf.cellar.config.Constants;
-import org.apache.karaf.cellar.core.CellarSupport;
-import org.apache.karaf.cellar.core.control.Switch;
-import org.apache.karaf.cellar.core.event.EventHandler;
-import org.apache.karaf.cellar.core.event.EventType;
-import org.apache.unomi.router.core.context.RouterCamelContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author dgaillard
- */
-public class UpdateCamelRouteEventHandler extends CellarSupport implements
EventHandler<UpdateCamelRouteEvent> {
- private static final Logger LLOGGER =
LoggerFactory.getLogger(UpdateCamelRouteEventHandler.class.getName());
-
- private RouterCamelContext routerCamelContext;
-
- @Override
- public void handle(UpdateCamelRouteEvent event) {
- LLOGGER.debug("Handle event");
- if (isAllowed(event.getSourceGroup(), Constants.CATEGORY,
event.getId(), EventType.INBOUND)) {
- LLOGGER.debug("Event is allowed");
- // check if it's not a "local" event
- if (event.getSourceNode() != null &&
event.getSourceNode().getId().equalsIgnoreCase(clusterManager.getNode().getId()))
{
- LLOGGER.debug("Cluster event is local (coming from local
synchronizer or listener)");
- return;
- }
-
- try {
- LLOGGER.debug("Event id is {}", event.getId());
- if (event.getId().equals(RouterCamelContext.EVENT_ID_REMOVE)
&& StringUtils.isNotBlank(event.getRouteId())) {
- routerCamelContext.killExistingRoute(event.getRouteId(),
false);
- } else if
((event.getId().equals(RouterCamelContext.EVENT_ID_IMPORT))) {
-
routerCamelContext.updateProfileImportReaderRoute(event.getRouteId(), false);
- } else if
(event.getId().equals(RouterCamelContext.EVENT_ID_EXPORT)) {
-
routerCamelContext.updateProfileExportReaderRoute(event.getRouteId(), false);
- }
- } catch (Exception e) {
- LLOGGER.error("Error when executing event", e);
- }
- }
- }
-
- @Override
- public Class<UpdateCamelRouteEvent> getType() {
- return UpdateCamelRouteEvent.class;
- }
-
- @Override
- public Switch getSwitch() {
- return null;
- }
-
- public void setRouterCamelContext(RouterCamelContext routerCamelContext) {
- this.routerCamelContext = routerCamelContext;
- }
-}
diff --git
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index d7b7a36c0..281c93f91 100644
---
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -112,7 +112,6 @@
<property name="persistenceService" ref="persistenceService"/>
<property name="profileExportService" ref="profileExportService"/>
<property name="profileService" ref="profileService"/>
- <property name="clusterService" ref="clusterService" />
</bean>
<service id="camelContextOSGI" ref="camelContext"
interface="org.apache.unomi.router.api.IRouterCamelContext"/>
@@ -120,18 +119,6 @@
<property name="persistenceService" ref="persistenceService"/>
</bean>
- <bean id="updateCamelRouteEventHandler"
class="org.apache.unomi.router.core.event.UpdateCamelRouteEventHandler">
- <property name="configurationAdmin" ref="osgiConfigurationAdmin"/>
- <property name="clusterManager" ref="karafCellarClusterManager"/>
- <property name="groupManager" ref="karafCellarGroupManager"/>
- <property name="routerCamelContext" ref="camelContext"/>
- </bean>
- <service ref="updateCamelRouteEventHandler"
interface="org.apache.karaf.cellar.core.event.EventHandler">
- <service-properties>
- <entry key="managed" value="true"/>
- </service-properties>
- </service>
-
<reference id="configSharingService"
interface="org.apache.unomi.api.services.ConfigSharingService" />
<reference id="profileImportService"
interface="org.apache.unomi.router.api.services.ProfileImportService"/>
<reference id="profileExportService"
interface="org.apache.unomi.router.api.services.ProfileExportService"/>
@@ -141,8 +128,6 @@
<reference id="importConfigurationService"
interface="org.apache.unomi.router.api.services.ImportExportConfigurationService"
filter="(configDiscriminator=IMPORT)"/>
<reference id="exportConfigurationService"
interface="org.apache.unomi.router.api.services.ImportExportConfigurationService"
filter="(configDiscriminator=EXPORT)"/>
<reference id="clusterService"
interface="org.apache.unomi.api.services.ClusterService" />
- <reference id="karafCellarGroupManager"
interface="org.apache.karaf.cellar.core.GroupManager" />
<reference id="osgiConfigurationAdmin"
interface="org.osgi.service.cm.ConfigurationAdmin"/>
- <reference id="karafCellarClusterManager"
interface="org.apache.karaf.cellar.core.ClusterManager" />
-</blueprint>
\ No newline at end of file
+</blueprint>
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index 38b7f0c66..a98d90d7d 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -256,11 +256,6 @@ public abstract class BaseIT extends KarafTestSupport {
editConfigurationFilePut("etc/custom.system.properties",
"org.apache.unomi.elasticsearch.rollover.maxDocs", "300"),
systemProperty("org.ops4j.pax.exam.rbc.rmi.port").value("1199"),
-
systemProperty("org.apache.unomi.hazelcast.group.name").value("cellar"),
-
systemProperty("org.apache.unomi.hazelcast.group.password").value("pass"),
-
systemProperty("org.apache.unomi.hazelcast.network.port").value("5701"),
-
systemProperty("org.apache.unomi.hazelcast.tcp-ip.members").value("127.0.0.1"),
-
systemProperty("org.apache.unomi.hazelcast.tcp-ip.interface").value("127.0.0.1"),
systemProperty("org.apache.unomi.healthcheck.enabled").value("true"),
logLevel(LogLevel.INFO),
@@ -383,6 +378,17 @@ public abstract class BaseIT extends KarafTestSupport {
segmentService = getService(SegmentService.class);
}
+ /**
+ * Updates an OSGi configuration with a single property value and
optionally waits for the service to be reregistered.
+ * If serviceName is null, the method will not wait for service
re-registration.
+ *
+ * @param serviceName The fully qualified name of the service to wait for,
or null to skip waiting
+ * @param configPid The persistent identifier of the configuration to
update
+ * @param propName The name of the property to update
+ * @param propValue The new value for the property
+ * @throws InterruptedException If the thread is interrupted while waiting
for service reregistration
+ * @throws IOException If an error occurs while updating the
configuration
+ */
public void updateConfiguration(String serviceName, String configPid,
String propName, Object propValue)
throws InterruptedException, IOException {
Map<String, Object> props = new HashMap<>();
@@ -390,20 +396,43 @@ public abstract class BaseIT extends KarafTestSupport {
updateConfiguration(serviceName, configPid, props);
}
+ /**
+ * Updates an OSGi configuration with multiple property values and
optionally waits for the service to be reregistered.
+ * If serviceName is null, the method will not wait for service
re-registration.
+ *
+ * @param serviceName The fully qualified name of the service to wait for,
or null to skip waiting
+ * @param configPid The persistent identifier of the configuration to
update
+ * @param propsToSet A map of property names to their new values
+ * @throws InterruptedException If the thread is interrupted while waiting
for service reregistration
+ * @throws IOException If an error occurs while updating the
configuration
+ */
public void updateConfiguration(String serviceName, String configPid,
Map<String, Object> propsToSet)
throws InterruptedException, IOException {
org.osgi.service.cm.Configuration cfg =
configurationAdmin.getConfiguration(configPid);
Dictionary<String, Object> props = cfg.getProperties();
+
+ // Handle case where properties haven't been initialized yet
+ final Dictionary<String, Object> finalProps = (props != null) ? props
: new Hashtable<>();
+
+ // Add new properties to the dictionary
for (Map.Entry<String, Object> propToSet : propsToSet.entrySet()) {
- props.put(propToSet.getKey(), propToSet.getValue());
+ finalProps.put(propToSet.getKey(), propToSet.getValue());
}
- waitForReRegistration(serviceName, () -> {
- try {
- cfg.update(props);
- } catch (IOException ignored) {
- }
- });
+ // If serviceName is null, don't wait for service re-registration
+ if (serviceName == null) {
+ LOGGER.info("Updating configuration {} without waiting for service
restart", configPid);
+ cfg.update(finalProps);
+ // Give the configuration change handler time to process
+ Thread.sleep(1000);
+ } else {
+ waitForReRegistration(serviceName, () -> {
+ try {
+ cfg.update(finalProps);
+ } catch (IOException ignored) {
+ }
+ });
+ }
waitForStartup();
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
index 623904938..479ba5657 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileServiceIT.java
@@ -168,7 +168,7 @@ public class ProfileServiceIT extends BaseIT {
}
}
- updateConfiguration(PersistenceService.class.getName(),
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
+ updateConfiguration(null,
"org.apache.unomi.persistence.elasticsearch", "throwExceptions", true);
Query query = new Query();
query.setLimit(2);
@@ -181,7 +181,7 @@ public class ProfileServiceIT extends BaseIT {
} catch (RuntimeException ex) {
// Should get here since this scenario should throw exception
} finally {
- updateConfiguration(PersistenceService.class.getName(),
"org.apache.unomi.persistence.elasticsearch", "throwExceptions",
+ updateConfiguration(null,
"org.apache.unomi.persistence.elasticsearch", "throwExceptions",
throwExceptionCurrent);
}
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
b/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
index 1d5a23d9d..04ef4904a 100644
--- a/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/RuleServiceIT.java
@@ -151,14 +151,14 @@ public class RuleServiceIT extends BaseIT {
Profile profile = new Profile(UUID.randomUUID().toString());
Session session = new Session(UUID.randomUUID().toString(), profile,
new Date(), TEST_SCOPE);
- updateConfiguration(RulesService.class.getName(),
"org.apache.unomi.services", "rules.optimizationActivated", "false");
+ updateConfiguration(null, "org.apache.unomi.services",
"rules.optimizationActivated", "false");
rulesService = getService(RulesService.class);
eventService = getService(EventService.class);
LOGGER.info("Running unoptimized rules performance test...");
long unoptimizedRunTime = runEventTest(profile, session);
- updateConfiguration(RulesService.class.getName(),
"org.apache.unomi.services", "rules.optimizationActivated", "true");
+ updateConfiguration(null, "org.apache.unomi.services",
"rules.optimizationActivated", "true");
rulesService = getService(RulesService.class);
eventService = getService(EventService.class);
diff --git a/kar/pom.xml b/kar/pom.xml
index 226e3c85c..9a38f9b78 100644
--- a/kar/pom.xml
+++ b/kar/pom.xml
@@ -143,13 +143,6 @@
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>apache-karaf-cellar</artifactId>
- <classifier>features</classifier>
- <type>xml</type>
- <scope>runtime</scope>
- </dependency>
<dependency>
<groupId>org.apache.unomi</groupId>
<artifactId>unomi-web-tracker-wab</artifactId>
diff --git a/kar/src/main/feature/feature.xml b/kar/src/main/feature/feature.xml
index 9de01623a..6355a69e4 100644
--- a/kar/src/main/feature/feature.xml
+++ b/kar/src/main/feature/feature.xml
@@ -19,7 +19,6 @@
<features name="unomi-kar"
xmlns="http://karaf.apache.org/xmlns/features/v1.3.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.3.0
http://karaf.apache.org/xmlns/features/v1.3.0">
<repository>mvn:org.apache.cxf.karaf/apache-cxf/${cxf.version}/xml/features</repository>
-
<repository>mvn:org.apache.karaf.cellar/apache-karaf-cellar/${version.karaf.cellar}/xml/features</repository>
<feature description="unomi-kar" version="${project.version}"
name="unomi-kar" start-level="70">
<feature prerequisite="true">wrap</feature>
@@ -29,7 +28,6 @@
<feature>cxf-features-metrics</feature>
<feature>cxf-rs-security-cors</feature>
<feature>cxf-rs-description-openapi-v3</feature>
- <feature>cellar</feature>
<feature>eventadmin</feature>
<feature>feature</feature>
<feature>shell-compat</feature>
@@ -40,7 +38,6 @@
<configfile
finalname="/etc/org.apache.unomi.services.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/servicescfg</configfile>
<configfile
finalname="/etc/org.apache.unomi.thirdparty.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/thirdpartycfg</configfile>
<configfile
finalname="/etc/org.apache.unomi.cluster.cfg">mvn:org.apache.unomi/unomi-services/${project.version}/cfg/clustercfg</configfile>
- <configfile
finalname="/etc/hazelcast.xml">mvn:org.apache.unomi/unomi-services/${project.version}/xml/hazelcastconfig</configfile>
<configfile
finalname="/etc/org.apache.unomi.geonames.cfg">mvn:org.apache.unomi/cxs-geonames-services/${project.version}/cfg/geonamescfg</configfile>
<configfile
finalname="/etc/org.apache.unomi.groovy.actions.cfg">mvn:org.apache.unomi/unomi-groovy-actions-services/${project.version}/cfg/groovyactionscfg</configfile>
<configfile
finalname="/etc/org.apache.unomi.schema.cfg">mvn:org.apache.unomi/unomi-json-schema-services/${project.version}/cfg/schemacfg</configfile>
diff --git a/manual/src/main/asciidoc/building-and-deploying.adoc
b/manual/src/main/asciidoc/building-and-deploying.adoc
index 840639e6e..3de47041f 100644
--- a/manual/src/main/asciidoc/building-and-deploying.adoc
+++ b/manual/src/main/asciidoc/building-and-deploying.adoc
@@ -144,12 +144,11 @@ files (at the end of the file):
export KARAF_OPTS="$KARAF_OPTS -Xmx3G"
----
-Install the WAR support, CXF and Karaf Cellar into Karaf by doing the
following in the Karaf command line:
+Install the WAR support and CXF into Karaf by doing the following in the Karaf
command line:
[source]
----
feature:repo-add cxf-jaxrs 3.3.4
- feature:repo-add cellar 4.1.3
feature:repo-add mvn:org.apache.unomi/unomi-kar/VERSION/xml/features
feature:install unomi-kar
----
diff --git a/manual/src/main/asciidoc/clustering.adoc
b/manual/src/main/asciidoc/clustering.adoc
index be27ccfd9..9d31aa6a2 100644
--- a/manual/src/main/asciidoc/clustering.adoc
+++ b/manual/src/main/asciidoc/clustering.adoc
@@ -13,7 +13,7 @@
//
=== Cluster setup
-Apache Karaf relies on Apache Karaf Cellar, which in turn uses Hazelcast to
discover and configure its cluster.
+Apache Karaf relies on Persistence to register nodes and manage cluster.
You can control most of the important clustering settings through the
centralized configuration file at
@@ -21,19 +21,9 @@ You can control most of the important clustering settings
through the centralize
And notably using the following properties:
-
org.apache.unomi.hazelcast.group.name=${env:UNOMI_HAZELCAST_GROUP_NAME:-cellar}
-
org.apache.unomi.hazelcast.group.password=${env:UNOMI_HAZELCAST_GROUP_PASSWORD:-pass}
- # This list can be comma separated and use ranges such as
192.168.1.0-7,192.168.1.21
-
org.apache.unomi.hazelcast.tcp-ip.members=${env:UNOMI_HAZELCAST_TCPIP_MEMBERS:-127.0.0.1}
-
org.apache.unomi.hazelcast.tcp-ip.interface=${env:UNOMI_HAZELCAST_TCPIP_INTERFACE:-127.0.0.1}
-
org.apache.unomi.hazelcast.network.port=${env:UNOMI_HAZELCAST_NETWORK_PORT:-5701}
+
org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181}
+
org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443}
+ org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1}
+
org.apache.unomi.cluster.nodeStatisticsUpdateFrequency=${env:UNOMI_CLUSTER_NODESTATISTICS_UPDATEFREQUENCY:-10000}
-If you need more fine-grained control over the Hazelcast configuration you
could also edit the following file:
-
-[source]
-----
-etc/hazelcast.xml
-----
-
-Note that it would be best to keep all configuration in the centralized custom
configuration, for example by adding
-placeholders in the hazelcast.xml file if need be and adding the properties to
the centralized configuration file.
+Note that it is mandatory to set a different `org.apache.unomi.cluster.nodeId`
for each node in the cluster.
diff --git a/package/pom.xml b/package/pom.xml
index 13af2a5be..265e91973 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -80,13 +80,6 @@
<type>xml</type>
<scope>runtime</scope>
</dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>apache-karaf-cellar</artifactId>
- <classifier>features</classifier>
- <type>xml</type>
- <scope>runtime</scope>
- </dependency>
<dependency>
<groupId>org.apache.unomi</groupId>
@@ -192,17 +185,6 @@
</outputDirectory>
<destFileName>org.apache.unomi.persistence.elasticsearch.cfg</destFileName>
</artifactItem>
- <artifactItem>
- <groupId>org.apache.unomi</groupId>
- <artifactId>unomi-services</artifactId>
- <version>${project.version}</version>
- <classifier>hazelcastconfig</classifier>
- <type>xml</type>
- <outputDirectory>
- ${project.build.directory}/assembly/etc
- </outputDirectory>
- <destFileName>hazelcast.xml</destFileName>
- </artifactItem>
<artifactItem>
<groupId>org.apache.unomi</groupId>
<artifactId>unomi-services</artifactId>
@@ -362,7 +344,6 @@
<feature>system</feature>
<feature>war</feature>
<feature>cxf-jaxrs</feature>
- <feature>cellar</feature>
<feature>aries-blueprint</feature>
<feature>shell-compat</feature>
<feature>unomi-kar</feature>
diff --git a/package/src/main/resources/etc/custom.system.properties
b/package/src/main/resources/etc/custom.system.properties
index 5e97437c3..7b1b576eb 100644
--- a/package/src/main/resources/etc/custom.system.properties
+++ b/package/src/main/resources/etc/custom.system.properties
@@ -18,14 +18,6 @@
################################################################################
# This following file is used to customize system properties for the Apache
Unomi application running in Apache Karaf.
${optionals}=unomi.custom.system.properties
-#######################################################################################################################
-## Hazelcast clustering settings
##
-#######################################################################################################################
-org.apache.unomi.hazelcast.group.name=${env:UNOMI_HAZELCAST_GROUP_NAME:-cellar}
-org.apache.unomi.hazelcast.group.password=${env:UNOMI_HAZELCAST_GROUP_PASSWORD:-pass}
-# This list can be comma separated and use ranges such as
192.168.1.0-7,192.168.1.21
-org.apache.unomi.hazelcast.tcp-ip.members=${env:UNOMI_HAZELCAST_TCPIP_MEMBERS:-127.0.0.1}
-org.apache.unomi.hazelcast.network.port=${env:UNOMI_HAZELCAST_NETWORK_PORT:-5701}
#######################################################################################################################
## Security settings
##
@@ -81,11 +73,14 @@
org.apache.unomi.admin.servlet.context=${env:UNOMI_ADMIN_CONTEXT:-/cxs}
#######################################################################################################################
## Cluster Settings
##
#######################################################################################################################
-org.apache.unomi.cluster.group=${env:UNOMI_CLUSTER_GROUP:-default}
# To simplify testing we set the public address to use HTTP, but for
production environments it is highly recommended
# to switch to using HTTPS with a proper SSL certificate installed.
org.apache.unomi.cluster.public.address=${env:UNOMI_CLUSTER_PUBLIC_ADDRESS:-http://localhost:8181}
org.apache.unomi.cluster.internal.address=${env:UNOMI_CLUSTER_INTERNAL_ADDRESS:-https://localhost:9443}
+# The nodeId is a required setting that uniquely identifies this node in the
cluster.
+# It must be set to a unique value for each node in the cluster.
+# Example: nodeId=node1
+org.apache.unomi.cluster.nodeId=${env:UNOMI_CLUSTER_NODEID:-unomi-node-1}
# The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
# system load average and uptime. This value is set in milliseconds and is set
to 10 seconds by default. Each node
# will retrieve the local values and broadcast them through a cluster event to
all the other nodes to update
diff --git a/package/src/main/resources/etc/org.apache.karaf.cellar.groups.cfg
b/package/src/main/resources/etc/org.apache.karaf.cellar.groups.cfg
deleted file mode 100644
index 9542b735e..000000000
--- a/package/src/main/resources/etc/org.apache.karaf.cellar.groups.cfg
+++ /dev/null
@@ -1,81 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-# This property stores the cluster groups for which the local node is member
-#
-groups = default
-
-#
-# Filtering of the bundles in the default cluster group
-#
-default.bundle.whitelist.inbound = *
-default.bundle.whitelist.outbound = *
-default.bundle.blacklist.inbound = *.xml
-default.bundle.blacklist.outbound = *.xml
-
-#
-# Filtering of the configurations in the default cluster group
-#
-default.config.whitelist.inbound = *
-default.config.whitelist.outbound = *
-default.config.blacklist.inbound = org.apache.felix.fileinstall*, \
- org.apache.karaf.cellar*, \
- org.apache.karaf.management, \
- org.apache.karaf.shell, \
- org.ops4j.pax.logging, \
- org.ops4j.pax.web, \
- org.apache.aries.transaction, \
- org.apache.unomi.cluster, \
- org.apache.unomi.geonames, \
- org.apache.unomi.persistence.elasticsearch,
\
- org.apache.unomi.router, \
- org.apache.unomi.plugins.request
-default.config.blacklist.outbound = org.apache.felix.fileinstall*, \
- org.apache.karaf.cellar*, \
- org.apache.karaf.management, \
- org.apache.karaf.shell, \
- org.ops4j.pax.logging, \
- org.ops4j.pax.web, \
- org.apache.aries.transaction, \
- org.apache.unomi.cluster, \
- org.apache.unomi.geonames, \
-
org.apache.unomi.persistence.elasticsearch, \
- org.apache.unomi.router, \
- org.apache.unomi.plugins.request
-
-#
-# Filtering of the features in the default cluster group
-#
-default.feature.whitelist.inbound = *
-default.feature.whitelist.outbound = *
-default.feature.blacklist.inbound = none
-default.feature.blacklist.outbound = none
-
-#
-# The following properties define the behavior to use when the node joins the
cluster (the usage of the bootstrap
-# synchronizer), per cluster group and per resource.
-# The following values are accepted:
-# disabled: means that the synchronizer is not used, meaning the node or the
cluster are not updated at all
-# cluster: if the node is the first one in the cluster, it pushes its local
state to the cluster, else it's not the
-# first node of the cluster, the node will update its local state with
the cluster one (meaning that the cluster
-# is the master)
-# node: in this case, the node is the master, it means that the cluster state
will be overwritten by the node state.
-#
-default.bundle.sync = disabled
-default.config.sync = disabled
-default.feature.sync = disabled
-default.obr.urls.sync = disabled
diff --git a/persistence-elasticsearch/core/pom.xml
b/persistence-elasticsearch/core/pom.xml
index ee5afc77f..810e38807 100644
--- a/persistence-elasticsearch/core/pom.xml
+++ b/persistence-elasticsearch/core/pom.xml
@@ -192,12 +192,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast-all</artifactId>
- <version>3.12.8</version>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.unomi</groupId>
@@ -226,7 +220,6 @@
com.google.apphosting.api;resolution:=optional,
com.google.common.geometry;resolution:=optional,
com.google.errorprone.annotations.concurrent;resolution:=optional,
-
com.hazelcast.core;version="[3.12,4)";resolution:=optional,
com.lmax.disruptor;resolution:=optional,
com.lmax.disruptor.dsl;resolution:=optional,
com.sun.management;resolution:=optional,
diff --git
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
index 27e874e52..372fb6950 100644
---
a/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
+++
b/persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java
@@ -40,6 +40,7 @@ import org.apache.unomi.metrics.MetricsService;
import org.apache.unomi.persistence.elasticsearch.conditions.*;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.persistence.spi.aggregate.*;
+import org.apache.unomi.persistence.spi.config.ConfigurationUpdateHelper;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
@@ -112,6 +113,7 @@ import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.osgi.framework.*;
+import org.osgi.service.cm.ManagedService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +136,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@SuppressWarnings("rawtypes")
-public class ElasticSearchPersistenceServiceImpl implements
PersistenceService, SynchronousBundleListener {
+public class ElasticSearchPersistenceServiceImpl implements
PersistenceService, SynchronousBundleListener, ManagedService {
public static final String BULK_PROCESSOR_BULK_SIZE =
"bulkProcessor.bulkSize";
public static final String BULK_PROCESSOR_FLUSH_INTERVAL =
"bulkProcessor.flushInterval";
@@ -717,6 +719,39 @@ public class ElasticSearchPersistenceServiceImpl
implements PersistenceService,
}
}
+ @Override
+ public void updated(Dictionary<String, ?> properties) {
+ Map<String, ConfigurationUpdateHelper.PropertyMapping>
propertyMappings = new HashMap<>();
+
+ // Boolean properties
+ propertyMappings.put("throwExceptions",
ConfigurationUpdateHelper.booleanProperty(this::setThrowExceptions));
+ propertyMappings.put("alwaysOverwrite",
ConfigurationUpdateHelper.booleanProperty(this::setAlwaysOverwrite));
+ propertyMappings.put("useBatchingForSave",
ConfigurationUpdateHelper.booleanProperty(this::setUseBatchingForSave));
+ propertyMappings.put("useBatchingForUpdate",
ConfigurationUpdateHelper.booleanProperty(this::setUseBatchingForUpdate));
+ propertyMappings.put("aggQueryThrowOnMissingDocs",
ConfigurationUpdateHelper.booleanProperty(this::setAggQueryThrowOnMissingDocs));
+
+ // String properties
+ propertyMappings.put("logLevelRestClient",
ConfigurationUpdateHelper.stringProperty(this::setLogLevelRestClient));
+ propertyMappings.put("clientSocketTimeout",
ConfigurationUpdateHelper.stringProperty(this::setClientSocketTimeout));
+ propertyMappings.put("taskWaitingTimeout",
ConfigurationUpdateHelper.stringProperty(this::setTaskWaitingTimeout));
+ propertyMappings.put("taskWaitingPollingInterval",
ConfigurationUpdateHelper.stringProperty(this::setTaskWaitingPollingInterval));
+ propertyMappings.put("aggQueryMaxResponseSizeHttp",
ConfigurationUpdateHelper.stringProperty(this::setAggQueryMaxResponseSizeHttp));
+
+ // Integer properties
+ propertyMappings.put("aggregateQueryBucketSize",
ConfigurationUpdateHelper.integerProperty(this::setAggregateQueryBucketSize));
+
+ // Custom property for itemTypeToRefreshPolicy with IOException
handling
+ propertyMappings.put("itemTypeToRefreshPolicy",
ConfigurationUpdateHelper.customProperty((value, logger) -> {
+ try {
+ setItemTypeToRefreshPolicy(value.toString());
+ } catch (IOException e) {
+ logger.warn("Error setting itemTypeToRefreshPolicy: {}",
e.getMessage());
+ }
+ }));
+
+ ConfigurationUpdateHelper.processConfigurationUpdates(properties,
LOGGER, "ElasticSearch persistence", propertyMappings);
+ }
+
private void loadPredefinedMappings(BundleContext bundleContext, boolean
forceUpdateMapping) {
Enumeration<URL> predefinedMappings =
bundleContext.getBundle().findEntries("META-INF/cxs/mappings", "*.json", true);
if (predefinedMappings == null) {
diff --git
a/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/clusterNode.json
b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/clusterNode.json
new file mode 100644
index 000000000..e3cd2f76d
--- /dev/null
+++
b/persistence-elasticsearch/core/src/main/resources/META-INF/cxs/mappings/clusterNode.json
@@ -0,0 +1,67 @@
+{
+ "dynamic_templates": [
+ {
+ "all": {
+ "match": "*",
+ "match_mapping_type": "string",
+ "mapping": {
+ "type": "text",
+ "analyzer": "folding",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ }
+ ],
+ "properties": {
+ "creationDate": {
+ "type": "date"
+ },
+ "lastModificationDate": {
+ "type": "date"
+ },
+ "lastSyncDate": {
+ "type": "date"
+ },
+ "cpuLoad": {
+ "type": "double"
+ },
+ "loadAverage": {
+ "type": "double"
+ },
+ "uptime": {
+ "type": "long"
+ },
+ "master": {
+ "type": "boolean"
+ },
+ "data": {
+ "type": "boolean"
+ },
+ "startTime": {
+ "type": "long"
+ },
+ "lastHeartbeat": {
+ "type": "long"
+ },
+ "serverInfo": {
+ "properties": {
+ "serverBuildDate": {
+ "type": "date"
+ },
+ "eventTypes": {
+ "type": "object",
+ "enabled": false
+ },
+ "capabilities": {
+ "type": "object",
+ "enabled": false
+ }
+ }
+ }
+ }
+}
diff --git
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index d14235171..e376be29f 100644
---
a/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/persistence-elasticsearch/core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -23,7 +23,7 @@
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0
http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd">
<cm:property-placeholder
persistent-id="org.apache.unomi.persistence.elasticsearch"
- update-strategy="reload"
placeholder-prefix="${es.">
+ update-strategy="none" placeholder-prefix="${es.">
<cm:default-properties>
<cm:property name="cluster.name" value="contextElasticSearch"/>
<cm:property name="elasticSearchAddresses" value="localhost:9200"/>
@@ -85,7 +85,11 @@
<interfaces>
<value>org.apache.unomi.persistence.spi.PersistenceService</value>
<value>org.osgi.framework.SynchronousBundleListener</value>
+ <value>org.osgi.service.cm.ManagedService</value>
</interfaces>
+ <service-properties>
+ <entry key="service.pid"
value="org.apache.unomi.persistence.elasticsearch"/>
+ </service-properties>
</service>
<bean id="conditionESQueryBuilderDispatcher"
diff --git
a/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/config/ConfigurationUpdateHelper.java
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/config/ConfigurationUpdateHelper.java
new file mode 100644
index 000000000..7b5154d45
--- /dev/null
+++
b/persistence-spi/src/main/java/org/apache/unomi/persistence/spi/config/ConfigurationUpdateHelper.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.persistence.spi.config;
+
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Utility class to handle configuration updates in ManagedService
implementations.
+ * This class provides generic methods to process configuration property
changes
+ * without hardcoding specific property names.
+ */
+public class ConfigurationUpdateHelper {
+
+ /**
+ * Processes configuration updates using a property mapping.
+ *
+ * @param properties The configuration properties dictionary
+ * @param logger The logger to use for debug messages
+ * @param serviceName The name of the service for logging purposes
+ * @param propertyMappings Map of property names to their setters and types
+ */
+ public static void processConfigurationUpdates(Dictionary<String, ?>
properties, Logger logger,
+ String serviceName,
+ Map<String, PropertyMapping>
propertyMappings) {
+ if (properties == null) {
+ return;
+ }
+
+ logger.info("{} configuration updated, applying changes without
restart", serviceName);
+
+ try {
+ for (Map.Entry<String, PropertyMapping> entry :
propertyMappings.entrySet()) {
+ String propertyName = entry.getKey();
+ PropertyMapping mapping = entry.getValue();
+
+ Object value = properties.get(propertyName);
+ if (value != null) {
+ try {
+ mapping.apply(value, logger);
+ logger.debug("Updated {} to: {}", propertyName, value);
+ } catch (Exception e) {
+ logger.warn("Error setting property {}: {}",
propertyName, e.getMessage());
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error applying configuration updates", e);
+ }
+ }
+
+ /**
+ * Creates a boolean property mapping.
+ *
+ * @param setter The setter function to call with the boolean value
+ * @return PropertyMapping for boolean properties
+ */
+ public static PropertyMapping booleanProperty(Consumer<Boolean> setter) {
+ return (value, logger) -> {
+ boolean boolValue = Boolean.parseBoolean(value.toString());
+ setter.accept(boolValue);
+ };
+ }
+
+ /**
+ * Creates a string property mapping.
+ *
+ * @param setter The setter function to call with the string value
+ * @return PropertyMapping for string properties
+ */
+ public static PropertyMapping stringProperty(Consumer<String> setter) {
+ return (value, logger) -> {
+ String stringValue = value.toString();
+ setter.accept(stringValue);
+ };
+ }
+
+ /**
+ * Creates an integer property mapping.
+ *
+ * @param setter The setter function to call with the integer value
+ * @return PropertyMapping for integer properties
+ */
+ public static PropertyMapping integerProperty(Consumer<Integer> setter) {
+ return (value, logger) -> {
+ int intValue = Integer.parseInt(value.toString());
+ setter.accept(intValue);
+ };
+ }
+
+ /**
+ * Creates a long property mapping.
+ *
+ * @param setter The setter function to call with the long value
+ * @return PropertyMapping for long properties
+ */
+ public static PropertyMapping longProperty(Consumer<Long> setter) {
+ return (value, logger) -> {
+ long longValue = Long.parseLong(value.toString());
+ setter.accept(longValue);
+ };
+ }
+
+ /**
+ * Creates a custom property mapping for special cases.
+ *
+ * @param processor The custom processor function
+ * @return PropertyMapping for custom properties
+ */
+ public static PropertyMapping customProperty(PropertyProcessor processor) {
+ return processor::process;
+ }
+
+ /**
+ * Functional interface for property processing.
+ */
+ @FunctionalInterface
+ public interface PropertyMapping {
+ /**
+ * Applies the property value using the appropriate setter.
+ *
+ * @param value The property value
+ * @param logger The logger to use
+ * @throws Exception if there's an error processing the property
+ */
+ void apply(Object value, Logger logger) throws Exception;
+ }
+
+ /**
+ * Functional interface for custom property processing.
+ */
+ @FunctionalInterface
+ public interface PropertyProcessor {
+ /**
+ * Processes the property value.
+ *
+ * @param value The property value
+ * @param logger The logger to use
+ * @throws Exception if there's an error processing the property
+ */
+ void process(Object value, Logger logger) throws Exception;
+ }
+}
diff --git a/pom.xml b/pom.xml
index c537486e1..bcef0de13 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,6 @@
<version.jackson.databind>2.11.4</version.jackson.databind>
<version.jackson.jaxb>2.11.4</version.jackson.jaxb>
<version.karaf>4.2.15</version.karaf>
- <version.karaf.cellar>4.2.1</version.karaf.cellar>
<version.pax.exam>4.13.5</version.pax.exam>
<elasticsearch.version>7.4.2</elasticsearch.version>
<elasticsearch.test.version>7.11.0</elasticsearch.test.version>
@@ -830,24 +829,6 @@
<type>xml</type>
<scope>runtime</scope>
</dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>apache-karaf-cellar</artifactId>
- <classifier>features</classifier>
- <version>${version.karaf.cellar}</version>
- <type>xml</type>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.core</artifactId>
- <version>${version.karaf.cellar}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.config</artifactId>
- <version>${version.karaf.cellar}</version>
- </dependency>
<!-- End of Apache Karaf dependencies -->
diff --git a/services/pom.xml b/services/pom.xml
index c4cf7e4da..c9fb46411 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -56,6 +56,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.unomi</groupId>
+ <artifactId>unomi-lifecycle-watcher</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>javax.servlet</groupId>
@@ -111,16 +117,6 @@
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.karaf.cellar</groupId>
- <artifactId>org.apache.karaf.cellar.config</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.unomi</groupId>
@@ -175,6 +171,7 @@
<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
<Import-Package>
sun.misc;resolution:=optional,
+ com.sun.management;resolution:=optional,
*
</Import-Package>
</instructions>
@@ -213,11 +210,6 @@
<type>cfg</type>
<classifier>clustercfg</classifier>
</artifact>
- <artifact>
-
<file>src/main/resources/hazelcast.xml</file>
- <type>xml</type>
- <classifier>hazelcastconfig</classifier>
- </artifact>
</artifacts>
</configuration>
</execution>
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
index ec4cfe523..e71973ce3 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java
@@ -18,75 +18,114 @@
package org.apache.unomi.services.impl.cluster;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
-import org.apache.karaf.cellar.config.Constants;
-import org.apache.karaf.cellar.core.*;
-import org.apache.karaf.cellar.core.control.SwitchStatus;
-import org.apache.karaf.cellar.core.event.Event;
-import org.apache.karaf.cellar.core.event.EventProducer;
-import org.apache.karaf.cellar.core.event.EventType;
+import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.api.ClusterNode;
+import org.apache.unomi.api.PartialList;
+import org.apache.unomi.api.ServerInfo;
+import org.apache.unomi.api.conditions.Condition;
+import org.apache.unomi.api.conditions.ConditionType;
import org.apache.unomi.api.services.ClusterService;
-import org.apache.unomi.api.services.SchedulerService;
+import org.apache.unomi.lifecycle.BundleWatcher;
import org.apache.unomi.persistence.spi.PersistenceService;
-import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.*;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* Implementation of the persistence service interface
*/
public class ClusterServiceImpl implements ClusterService {
- public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION =
"org.apache.unomi.nodes";
- public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS =
"publicEndpoints";
- public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS
= "internalEndpoints";
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
- PersistenceService persistenceService;
- private ClusterManager karafCellarClusterManager;
- private EventProducer karafCellarEventProducer;
- private GroupManager karafCellarGroupManager;
- private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
- private ConfigurationAdmin osgiConfigurationAdmin;
+
+ private PersistenceService persistenceService;
+
private String publicAddress;
private String internalAddress;
- private Map<String, Map<String,Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
- private Group group = null;
- private SchedulerService schedulerService;
-
+ //private SchedulerService schedulerService; /* Wait for PR UNOMI-878 to
reactivate that code
+ private ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(3);
+ private String nodeId;
+ private long nodeStartTime;
private long nodeStatisticsUpdateFrequency = 10000;
+ private Map<String, Map<String, Serializable>> nodeSystemStatistics = new
ConcurrentHashMap<>();
+ private volatile boolean shutdownNow = false;
+ private volatile List<ClusterNode> cachedClusterNodes =
Collections.emptyList();
- public void setPersistenceService(PersistenceService persistenceService) {
- this.persistenceService = persistenceService;
- }
+ private BundleWatcher bundleWatcher;
+ private ScheduledFuture<?> updateSystemStatsFuture;
+ private ScheduledFuture<?> cleanupStaleNodesFuture;
- public void setKarafCellarClusterManager(ClusterManager
karafCellarClusterManager) {
- this.karafCellarClusterManager = karafCellarClusterManager;
- }
+ /**
+ * Max time to wait for persistence service (in milliseconds)
+ */
+ private static final long MAX_WAIT_TIME = 60000; // 60 seconds
- public void setKarafCellarEventProducer(EventProducer
karafCellarEventProducer) {
- this.karafCellarEventProducer = karafCellarEventProducer;
+ /**
+ * Sets the bundle watcher used to retrieve server information
+ *
+ * @param bundleWatcher the bundle watcher
+ */
+ public void setBundleWatcher(BundleWatcher bundleWatcher) {
+ this.bundleWatcher = bundleWatcher;
+ LOGGER.info("BundleWatcher service set");
}
- public void setKarafCellarGroupManager(GroupManager
karafCellarGroupManager) {
- this.karafCellarGroupManager = karafCellarGroupManager;
- }
+ /**
+ * Waits for the persistence service to become available.
+ * This method will retry getting the persistence service with exponential
backoff
+ * until it's available or until the maximum wait time is reached.
+ *
+ * @throws IllegalStateException if the persistence service is not
available after the maximum wait time
+ */
+ private void waitForPersistenceService() {
+ if (shutdownNow) {
+ return;
+ }
+
+ // If persistence service is directly set (e.g., in unit tests), no
need to wait
+ if (persistenceService != null) {
+ LOGGER.debug("Persistence service is already available, no need to
wait");
+ return;
+ }
- public void setKarafCellarGroupName(String karafCellarGroupName) {
- this.karafCellarGroupName = karafCellarGroupName;
+ // Try to get the service with retries
+ long startTime = System.currentTimeMillis();
+ long waitTime = 50; // Start with 50ms wait time
+
+ while (System.currentTimeMillis() - startTime < MAX_WAIT_TIME) {
+ if (persistenceService != null) {
+ LOGGER.info("Persistence service is now available");
+ return;
+ }
+
+ try {
+ LOGGER.debug("Waiting for persistence service... ({}ms
elapsed)", System.currentTimeMillis() - startTime);
+ Thread.sleep(waitTime);
+ // Exponential backoff with a maximum of 5 seconds
+ waitTime = Math.min(waitTime * 2, 5000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted while waiting for persistence
service", e);
+ break;
+ }
+ }
+
+ throw new IllegalStateException("PersistenceService not available
after waiting " + MAX_WAIT_TIME + "ms");
}
- public void setOsgiConfigurationAdmin(ConfigurationAdmin
osgiConfigurationAdmin) {
- this.osgiConfigurationAdmin = osgiConfigurationAdmin;
+ /**
+ * For unit tests and backward compatibility - directly sets the
persistence service
+ * @param persistenceService the persistence service to set
+ */
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ LOGGER.info("PersistenceService set directly");
}
public void setPublicAddress(String publicAddress) {
@@ -101,8 +140,35 @@ public class ClusterServiceImpl implements ClusterService {
this.nodeStatisticsUpdateFrequency = nodeStatisticsUpdateFrequency;
}
+ /* Wait for PR UNOMI-878 to reactivate that code
public void setSchedulerService(SchedulerService schedulerService) {
this.schedulerService = schedulerService;
+
+ // If we're already initialized, initialize scheduled tasks now
+ // This handles the case when ClusterService was initialized before
SchedulerService was set
+ if (schedulerService != null && System.currentTimeMillis() >
nodeStartTime && nodeStartTime > 0) {
+ LOGGER.info("SchedulerService was set after ClusterService
initialization, initializing scheduled tasks now");
+ initializeScheduledTasks();
+ }
+ }
+ */
+
+ /* Wait for PR UNOMI-878 to reactivate that code
+ /**
+ * Unbind method for the scheduler service, called by the OSGi framework
when the service is unregistered
+ * @param schedulerService The scheduler service being unregistered
+ */
+ /*
+ public void unsetSchedulerService(SchedulerService schedulerService) {
+ if (this.schedulerService == schedulerService) {
+ LOGGER.info("SchedulerService was unset");
+ this.schedulerService = null;
+ }
+ }
+ */
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
}
public Map<String, Map<String, Serializable>> getNodeSystemStatistics() {
@@ -110,211 +176,345 @@ public class ClusterServiceImpl implements
ClusterService {
}
public void init() {
- if (karafCellarEventProducer != null && karafCellarClusterManager !=
null) {
-
- boolean setupConfigOk = true;
- group =
karafCellarGroupManager.findGroupByName(karafCellarGroupName);
- if (setupConfigOk && group == null) {
- LOGGER.error("Cluster group {} doesn't exist, creating it...",
karafCellarGroupName);
- group =
karafCellarGroupManager.createGroup(karafCellarGroupName);
- if (group != null) {
- setupConfigOk = true;
- } else {
- setupConfigOk = false;
- }
- }
+ // Validate that nodeId is provided
+ if (StringUtils.isBlank(nodeId)) {
+ String errorMessage = "CRITICAL: nodeId is not set. This is a
required setting for cluster operation.";
+ LOGGER.error(errorMessage);
+ throw new IllegalStateException(errorMessage);
+ }
- // check if the producer is ON
- if (setupConfigOk &&
karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
- LOGGER.error("Cluster event producer is OFF");
- setupConfigOk = false;
- }
+ // Wait for persistence service to be available
+ try {
+ waitForPersistenceService();
+ } catch (IllegalStateException e) {
+ LOGGER.error("Failed to initialize cluster service: {}",
e.getMessage());
+ return;
+ }
- // check if the config pid is allowed
- if (setupConfigOk && !isClusterConfigPIDAllowed(group,
Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION,
EventType.OUTBOUND)) {
- LOGGER.error("Configuration PID " +
KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster
group {}",
- karafCellarGroupName);
- setupConfigOk = false;
- }
+ nodeStartTime = System.currentTimeMillis();
- if (setupConfigOk) {
- Map<String, Properties> configurations =
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP +
Configurations.SEPARATOR + karafCellarGroupName);
- org.apache.karaf.cellar.core.Node thisKarafNode =
karafCellarClusterManager.getNode();
- Properties karafCellarClusterNodeConfiguration =
configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- if (karafCellarClusterNodeConfiguration == null) {
- karafCellarClusterNodeConfiguration = new Properties();
- }
- Map<String, String> publicEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" +
publicAddress);
- publicEndpoints.put(thisKarafNode.getId(), publicAddress);
- setMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
+ // Register this node in the persistence service
+ registerNodeInPersistence();
- Map<String, String> internalEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" +
internalAddress);
- internalEndpoints.put(thisKarafNode.getId(), internalAddress);
- setMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
+ /* Wait for PR UNOMI-878 to reactivate that code
+ /*
+ // Only initialize scheduled tasks if scheduler service is available
+ if (schedulerService != null) {
+ initializeScheduledTasks();
+ } else {
+ LOGGER.warn("SchedulerService not available during ClusterService
initialization. Scheduled tasks will not be registered. They will be registered
when SchedulerService becomes available.");
+ }
+ */
+ initializeScheduledTasks();
- configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION,
karafCellarClusterNodeConfiguration);
- ClusterConfigurationEvent clusterConfigurationEvent = new
ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- sendEvent(clusterConfigurationEvent);
- }
+ LOGGER.info("Cluster service initialized with node ID: {}", nodeId);
+ }
- TimerTask statisticsTask = new TimerTask() {
- @Override
- public void run() {
- try {
- updateSystemStats();
- } catch (Throwable t) {
- LOGGER.error("Error updating system statistics", t);
- }
+ /**
+ * Initializes scheduled tasks for cluster management.
+ * This method can be called later if schedulerService wasn't available
during init.
+ */
+ public void initializeScheduledTasks() {
+ /* Wait for PR UNOMI-878 to reactivate that code
+ if (schedulerService == null) {
+ LOGGER.error("Cannot initialize scheduled tasks: SchedulerService
is not set");
+ return;
+ }
+ */
+
+ // Schedule regular updates of the node statistics
+ TimerTask statisticsTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ updateSystemStats();
+ } catch (Throwable t) {
+ LOGGER.error("Error updating system statistics", t);
}
- };
-
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask,
0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
+ }
+ };
+ /* Wait for PR UNOMI-878 to reactivate that code
+ schedulerService.createRecurringTask("clusterNodeStatisticsUpdate",
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS, statisticsTask, false);
+ */
+ updateSystemStatsFuture =
scheduledExecutorService.scheduleAtFixedRate(statisticsTask, 100,
nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
+
+ // Schedule cleanup of stale nodes
+ TimerTask cleanupTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ cleanupStaleNodes();
+ } catch (Throwable t) {
+ LOGGER.error("Error cleaning up stale nodes", t);
+ }
+ }
+ };
+ /* Wait for PR UNOMI-878 to reactivate that code
+ schedulerService.createRecurringTask("clusterStaleNodesCleanup",
60000, TimeUnit.MILLISECONDS, cleanupTask, false);
+ */
+ cleanupStaleNodesFuture =
scheduledExecutorService.scheduleAtFixedRate(cleanupTask, 100, 60000,
TimeUnit.MILLISECONDS);
- }
- LOGGER.info("Cluster service initialized.");
+ LOGGER.info("Cluster service scheduled tasks initialized");
}
public void destroy() {
- LOGGER.info("Cluster service shutdown.");
- }
-
- @Override
- public List<ClusterNode> getClusterNodes() {
- Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String,
ClusterNode>();
-
- Set<org.apache.karaf.cellar.core.Node> karafCellarNodes =
karafCellarClusterManager.listNodes();
- org.apache.karaf.cellar.core.Node thisKarafNode =
karafCellarClusterManager.getNode();
- Map<String, Properties> clusterConfigurations =
karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP +
Configurations.SEPARATOR + karafCellarGroupName);
- Properties karafCellarClusterNodeConfiguration =
clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
- Map<String, String> publicNodeEndpoints = new TreeMap<>();
- Map<String, String> internalNodeEndpoints = new TreeMap<>();
- if (karafCellarClusterNodeConfiguration != null) {
- publicNodeEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" +
publicAddress);
- internalNodeEndpoints =
getMapProperty(karafCellarClusterNodeConfiguration,
KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" +
internalAddress);
- }
- for (org.apache.karaf.cellar.core.Node karafCellarNode :
karafCellarNodes) {
- ClusterNode clusterNode = new ClusterNode();
- String publicEndpoint =
publicNodeEndpoints.get(karafCellarNode.getId());
- if (publicEndpoint != null) {
- clusterNode.setPublicHostAddress(publicEndpoint);
+ LOGGER.info("Cluster service shutting down...");
+ shutdownNow = true;
+
+ // Cancel scheduled tasks
+ if (updateSystemStatsFuture != null) {
+ boolean successfullyCancelled =
updateSystemStatsFuture.cancel(false);
+ if (!successfullyCancelled) {
+ LOGGER.warn("Failed to cancel scheduled task:
clusterNodeStatisticsUpdate");
+ } else {
+ LOGGER.info("Scheduled task: clusterNodeStatisticsUpdate
cancelled");
}
- String internalEndpoint =
internalNodeEndpoints.get(karafCellarNode.getId());
- if (internalEndpoint != null) {
- clusterNode.setInternalHostAddress(internalEndpoint);
+ }
+ if (cleanupStaleNodesFuture != null) {
+ boolean successfullyCancelled =
cleanupStaleNodesFuture.cancel(false);
+ if (!successfullyCancelled) {
+ LOGGER.warn("Failed to cancel scheduled task:
cleanupStaleNodesFuture");
+ } else {
+ LOGGER.info("Scheduled task: cleanupStaleNodesFuture
cancelled");
}
- Map<String,Serializable> nodeStatistics =
nodeSystemStatistics.get(karafCellarNode.getId());
- if (nodeStatistics != null) {
- Long uptime = (Long) nodeStatistics.get("uptime");
- if (uptime != null) {
- clusterNode.setUptime(uptime);
- }
- Double systemCpuLoad = (Double)
nodeStatistics.get("systemCpuLoad");
- if (systemCpuLoad != null) {
- clusterNode.setCpuLoad(systemCpuLoad);
- }
- List<Double> loadAverage = (List<Double>)
nodeStatistics.get("systemLoadAverage");
- if (loadAverage != null) {
- Double[] loadAverageArray = loadAverage.toArray(new
Double[loadAverage.size()]);
- ArrayUtils.toPrimitive(loadAverageArray);
-
clusterNode.setLoadAverage(ArrayUtils.toPrimitive(loadAverageArray));
+ }
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ try {
+ boolean successfullyTerminated =
scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS);
+ if (!successfullyTerminated) {
+ LOGGER.warn("Failed to terminate scheduled tasks after 10
seconds...");
+ } else {
+ LOGGER.info("Scheduled tasks terminated");
}
+ } catch (InterruptedException e) {
+ LOGGER.error("Error waiting for scheduled tasks to terminate",
e);
}
- clusterNodes.put(karafCellarNode.getId(), clusterNode);
}
- return new ArrayList<ClusterNode>(clusterNodes.values());
- }
-
- @Override
- public void purge(Date date) {
- persistenceService.purge(date);
- }
+ // Remove node from persistence service
+ if (persistenceService != null) {
+ try {
+ persistenceService.remove(nodeId, ClusterNode.class);
+ LOGGER.info("Node {} removed from cluster", nodeId);
+ } catch (Exception e) {
+ LOGGER.error("Error removing node from cluster", e);
+ }
+ }
- @Override
- public void purge(String scope) {
- persistenceService.purge(scope);
- }
+ // Clear references
+ persistenceService = null;
+ bundleWatcher = null;
- @Override
- public void sendEvent(Serializable eventObject) {
- Event event = (Event) eventObject;
- event.setSourceGroup(group);
- event.setSourceNode(karafCellarClusterManager.getNode());
- karafCellarEventProducer.produce(event);
+ LOGGER.info("Cluster service shutdown.");
}
/**
- * Check if a configuration is allowed.
- *
- * @param group the cluster group.
- * @param category the configuration category constant.
- * @param pid the configuration PID.
- * @param type the cluster event type.
- * @return true if the cluster event type is allowed, false else.
+ * Register this node in the persistence service
*/
- public boolean isClusterConfigPIDAllowed(Group group, String category,
String pid, EventType type) {
- CellarSupport support = new CellarSupport();
- support.setClusterManager(this.karafCellarClusterManager);
- support.setGroupManager(this.karafCellarGroupManager);
- support.setConfigurationAdmin(this.osgiConfigurationAdmin);
- return support.isAllowed(group, category, pid, type);
- }
-
- private Map<String, String> getMapProperty(Properties properties, String
propertyName, String defaultValue) {
- String propertyValue = properties.getProperty(propertyName,
defaultValue);
- return getMapProperty(propertyValue);
- }
-
- private Map<String, String> getMapProperty(String propertyValue) {
- String[] propertyValueArray = propertyValue.split(",");
- Map<String, String> propertyMapValue = new LinkedHashMap<>();
- for (String propertyValueElement : propertyValueArray) {
- String[] propertyValueElementPrats =
propertyValueElement.split("=");
- propertyMapValue.put(propertyValueElementPrats[0],
propertyValueElementPrats[1]);
+ private void registerNodeInPersistence() {
+ if (persistenceService == null) {
+ LOGGER.error("Cannot register node: PersistenceService not
available");
+ return;
}
- return propertyMapValue;
- }
- private Map<String, String> setMapProperty(Properties properties, String
propertyName, Map<String, String> propertyMapValue) {
- StringBuilder propertyValueBuilder = new StringBuilder();
- int entryCount = 0;
- for (Map.Entry<String, String> propertyMapValueEntry :
propertyMapValue.entrySet()) {
- propertyValueBuilder.append(propertyMapValueEntry.getKey());
- propertyValueBuilder.append("=");
- propertyValueBuilder.append(propertyMapValueEntry.getValue());
- if (entryCount < propertyMapValue.size() - 1) {
- propertyValueBuilder.append(",");
- }
+ ClusterNode clusterNode = new ClusterNode();
+ clusterNode.setItemId(nodeId);
+ clusterNode.setPublicHostAddress(publicAddress);
+ clusterNode.setInternalHostAddress(internalAddress);
+ clusterNode.setStartTime(nodeStartTime);
+ clusterNode.setLastHeartbeat(System.currentTimeMillis());
+
+ // Set server information if BundleWatcher is available
+ if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
+ ServerInfo serverInfo = bundleWatcher.getServerInfos().get(0);
+ clusterNode.setServerInfo(serverInfo);
+ LOGGER.info("Added server info to node: version={}, build={}",
+ serverInfo.getServerVersion(),
serverInfo.getServerBuildNumber());
+ } else {
+ LOGGER.warn("BundleWatcher not available at registration time,
server info will not be available");
}
- String oldPropertyValue = (String)
properties.setProperty(propertyName, propertyValueBuilder.toString());
- if (oldPropertyValue == null) {
- return null;
+
+ updateSystemStatsForNode(clusterNode);
+
+ boolean success = persistenceService.save(clusterNode);
+ if (success) {
+ LOGGER.info("Node {} registered in cluster", nodeId);
+ } else {
+ LOGGER.error("Failed to register node {} in cluster", nodeId);
}
- return getMapProperty(oldPropertyValue);
}
- private void updateSystemStats() {
+ /**
+ * Updates system stats for the given cluster node
+ */
+ private void updateSystemStatsForNode(ClusterNode node) {
final RuntimeMXBean remoteRuntime =
ManagementFactory.getRuntimeMXBean();
long uptime = remoteRuntime.getUptime();
- ObjectName operatingSystemMXBeanName =
ManagementFactory.getOperatingSystemMXBean().getObjectName();
- Double systemCpuLoad = null;
+
+ double systemCpuLoad = 0.0;
try {
- systemCpuLoad = (Double)
ManagementFactory.getPlatformMBeanServer().getAttribute(operatingSystemMXBeanName,
"SystemCpuLoad");
- } catch (MBeanException | AttributeNotFoundException |
InstanceNotFoundException | ReflectionException e) {
- LOGGER.error("Error retrieving system CPU load", e);
+ systemCpuLoad = ((com.sun.management.OperatingSystemMXBean)
ManagementFactory.getOperatingSystemMXBean()).getSystemCpuLoad();
+ // Check for NaN value which Elasticsearch and OpenSearch don't
support for float fields
+ if (Double.isNaN(systemCpuLoad)) {
+ LOGGER.debug("System CPU load is NaN, setting to 0.0");
+ systemCpuLoad = 0.0;
+ }
+ } catch (Exception e) {
+ LOGGER.debug("Error retrieving system CPU load", e);
}
+
final OperatingSystemMXBean operatingSystemMXBean =
ManagementFactory.getOperatingSystemMXBean();
double systemLoadAverage =
operatingSystemMXBean.getSystemLoadAverage();
+ // Check for NaN value which Elasticsearch/OpenSearch doesn't support
for float fields
+ if (Double.isNaN(systemLoadAverage)) {
+ LOGGER.debug("System load average is NaN, setting to 0.0");
+ systemLoadAverage = 0.0;
+ }
+
+ node.setCpuLoad(systemCpuLoad);
+ node.setUptime(uptime);
- ClusterSystemStatisticsEvent clusterSystemStatisticsEvent = new
ClusterSystemStatisticsEvent("org.apache.unomi.cluster.system.statistics");
- Map<String,Serializable> systemStatistics = new TreeMap<>();
ArrayList<Double> systemLoadAverageArray = new ArrayList<>();
systemLoadAverageArray.add(systemLoadAverage);
+
node.setLoadAverage(ArrayUtils.toPrimitive(systemLoadAverageArray.toArray(new
Double[0])));
+
+ // Store system statistics in memory as well
+ Map<String, Serializable> systemStatistics = new TreeMap<>();
systemStatistics.put("systemLoadAverage", systemLoadAverageArray);
systemStatistics.put("systemCpuLoad", systemCpuLoad);
systemStatistics.put("uptime", uptime);
- clusterSystemStatisticsEvent.setStatistics(systemStatistics);
- nodeSystemStatistics.put(karafCellarClusterManager.getNode().getId(),
systemStatistics);
- sendEvent(clusterSystemStatisticsEvent);
+ nodeSystemStatistics.put(nodeId, systemStatistics);
+ }
+
+ /**
+ * Updates the system statistics for this node and stores them in the
persistence service
+ */
+ private void updateSystemStats() {
+ if (shutdownNow) {
+ return;
+ }
+
+ if (persistenceService == null) {
+ LOGGER.warn("Cannot update system stats: PersistenceService not
available");
+ return;
+ }
+
+ // Load node from persistence
+ ClusterNode node = persistenceService.load(nodeId, ClusterNode.class);
+ if (node == null) {
+ LOGGER.warn("Node {} not found in persistence, re-registering",
nodeId);
+ registerNodeInPersistence();
+ return;
+ }
+
+ try {
+ // Update its stats
+ updateSystemStatsForNode(node);
+
+ // Update server info if needed
+ if (bundleWatcher != null &&
!bundleWatcher.getServerInfos().isEmpty()) {
+ ServerInfo currentInfo = bundleWatcher.getServerInfos().get(0);
+ // Check if server info needs updating
+ if (node.getServerInfo() == null ||
+
!currentInfo.getServerVersion().equals(node.getServerInfo().getServerVersion()))
{
+
+ node.setServerInfo(currentInfo);
+ LOGGER.info("Updated server info for node {}: version={},
build={}",
+ nodeId, currentInfo.getServerVersion(),
currentInfo.getServerBuildNumber());
+ }
+ }
+
+ node.setLastHeartbeat(System.currentTimeMillis());
+
+ // Save back to persistence
+ boolean success = persistenceService.save(node);
+ if (!success) {
+ LOGGER.error("Failed to update node {} statistics", nodeId);
+ }
+
+ // Always refresh cluster nodes cache after attempting stats update
+ try {
+ List<ClusterNode> nodes =
persistenceService.getAllItems(ClusterNode.class, 0, -1, null).getList();
+ cachedClusterNodes = nodes;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to refresh cluster nodes cache during
stats update", e);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error updating system statistics for node {}: {}",
nodeId, e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Removes stale nodes from the cluster
+ */
+ private void cleanupStaleNodes() {
+ if (shutdownNow) {
+ return;
+ }
+
+ if (persistenceService == null) {
+ LOGGER.warn("Cannot cleanup stale nodes: PersistenceService not
available");
+ return;
+ }
+
+ long cutoffTime = System.currentTimeMillis() -
(nodeStatisticsUpdateFrequency * 3); // Node is stale if no heartbeat for 3x
the update frequency
+
+ Condition staleNodesCondition = new Condition();
+ ConditionType propertyConditionType = new ConditionType();
+ propertyConditionType.setItemId("propertyCondition");
+ propertyConditionType.setItemType(ConditionType.ITEM_TYPE);
+
propertyConditionType.setConditionEvaluator("propertyConditionEvaluator");
+
propertyConditionType.setQueryBuilder("propertyConditionESQueryBuilder");
+ staleNodesCondition.setConditionType(propertyConditionType);
+ staleNodesCondition.setConditionTypeId("propertyCondition");
+ staleNodesCondition.setParameter("propertyName", "lastHeartbeat");
+ staleNodesCondition.setParameter("comparisonOperator", "lessThan");
+ staleNodesCondition.setParameter("propertyValueInteger", cutoffTime);
+
+ PartialList<ClusterNode> staleNodes =
persistenceService.query(staleNodesCondition, null, ClusterNode.class, 0, -1);
+
+ for (ClusterNode staleNode : staleNodes.getList()) {
+ LOGGER.info("Removing stale node: {}", staleNode.getItemId());
+ persistenceService.remove(staleNode.getItemId(),
ClusterNode.class);
+ nodeSystemStatistics.remove(staleNode.getItemId());
+ }
+ }
+
+ @Override
+ public List<ClusterNode> getClusterNodes() {
+ // Return cached cluster nodes, creating a defensive copy
+ return cachedClusterNodes.isEmpty() ? Collections.emptyList() : new
ArrayList<>(cachedClusterNodes);
}
+ @Override
+ public void purge(Date date) {
+ if (persistenceService == null) {
+ LOGGER.warn("Cannot purge by date: PersistenceService not
available");
+ return;
+ }
+
+ persistenceService.purge(date);
+ }
+
+ @Override
+ public void purge(String scope) {
+ if (persistenceService == null) {
+ LOGGER.warn("Cannot purge by scope: PersistenceService not
available");
+ return;
+ }
+
+ persistenceService.purge(scope);
+ }
+
+ /**
+ * Check if a persistence service is available.
+ * This can be used to quickly check before performing operations.
+ *
+ * @return true if a persistence service is available (either directly set
or via tracker)
+ */
+ public boolean isPersistenceServiceAvailable() {
+ return persistenceService != null;
+ }
}
+
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEvent.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEvent.java
deleted file mode 100644
index 3c4ec5ad7..000000000
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEvent.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.services.impl.cluster;
-
-import org.apache.karaf.cellar.core.event.Event;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * The cluster event used to transmit update to node system statistics.
- */
-public class ClusterSystemStatisticsEvent extends Event {
-
- Map<String,Serializable> statistics = new TreeMap<>();
-
- public ClusterSystemStatisticsEvent(String id) {
- super(id);
- }
-
- public Map<String, Serializable> getStatistics() {
- return statistics;
- }
-
- public void setStatistics(Map<String, Serializable> statistics) {
- this.statistics = statistics;
- }
-}
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEventHandler.java
b/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEventHandler.java
deleted file mode 100644
index 9eecaf4ba..000000000
---
a/services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterSystemStatisticsEventHandler.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.unomi.services.impl.cluster;
-
-import org.apache.karaf.cellar.config.Constants;
-import org.apache.karaf.cellar.core.CellarSupport;
-import org.apache.karaf.cellar.core.Configurations;
-import org.apache.karaf.cellar.core.Group;
-import org.apache.karaf.cellar.core.control.BasicSwitch;
-import org.apache.karaf.cellar.core.control.Switch;
-import org.apache.karaf.cellar.core.control.SwitchStatus;
-import org.apache.karaf.cellar.core.event.EventHandler;
-import org.apache.karaf.cellar.core.event.EventType;
-import org.osgi.service.cm.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * A Karaf Cellar event handler to process incoming events that contain system
statistics updates from nodes.
- */
-public class ClusterSystemStatisticsEventHandler extends CellarSupport
implements EventHandler<ClusterSystemStatisticsEvent> {
-
- public static final String SWITCH_ID =
"org.apache.unomi.cluster.system.statistics.handler";
- private static final Logger LLOGGER =
LoggerFactory.getLogger(ClusterSystemStatisticsEventHandler.class.getName());
- private final Switch eventSwitch = new BasicSwitch(SWITCH_ID);
- private ClusterServiceImpl clusterServiceImpl;
-
- public void setClusterServiceImpl(ClusterServiceImpl clusterServiceImpl) {
- this.clusterServiceImpl = clusterServiceImpl;
- }
-
- public void init() {
- // nothing to do
- }
-
- public void destroy() {
- // nothing to do
- }
-
- @Override
- public void handle(ClusterSystemStatisticsEvent event) {
- // check if the handler is ON
- if (this.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
- LLOGGER.debug("CELLAR SYSTEM STATS: {} switch is OFF, cluster
event not handled", SWITCH_ID);
- return;
- }
-
- if (groupManager == null) {
- //in rare cases for example right after installation this happens!
- LLOGGER.error("CELLAR SYSTEM STATS: retrieved event {} while
groupManager is not available yet!", event);
- return;
- }
-
- // check if the group is local
- if (!groupManager.isLocalGroup(event.getSourceGroup().getName())) {
- LLOGGER.info("CELLAR SYSTEM STATS: node is not part of the event
cluster group {}",event.getSourceGroup().getName());
- return;
- }
-
- Group group = event.getSourceGroup();
- String groupName = group.getName();
-
- String pid = event.getId();
-
- if (isAllowed(event.getSourceGroup(), Constants.CATEGORY, pid,
EventType.INBOUND)) {
-
- // check if it's not a "local" event
- if (event.getSourceNode() != null &&
event.getSourceNode().getId().equalsIgnoreCase(clusterManager.getNode().getId()))
{
- LLOGGER.trace("CELLAR SYSTEM STATS: cluster event is local
(coming from local synchronizer or listener)");
- return;
- }
-
-
- Map<String, Serializable> nodeSystemStatistics =
clusterServiceImpl.getNodeSystemStatistics().get(event.getSourceNode().getId());
- if (nodeSystemStatistics == null) {
- nodeSystemStatistics = new ConcurrentHashMap<>();
- }
- nodeSystemStatistics.putAll(event.getStatistics());
-
clusterServiceImpl.getNodeSystemStatistics().put(event.getSourceNode().getId(),
nodeSystemStatistics);
- }
-
- }
-
- @Override
- public Class<ClusterSystemStatisticsEvent> getType() {
- return ClusterSystemStatisticsEvent.class;
- }
-
- /**
- * Get the cluster configuration event handler switch.
- *
- * @return the cluster configuration event handler switch.
- */
- @Override
- public Switch getSwitch() {
- // load the switch status from the config
- try {
- Configuration configuration =
configurationAdmin.getConfiguration(Configurations.NODE, null);
- if (configuration != null) {
- String handlerStatus = (String)
configuration.getProperties().get(Configurations.HANDLER + "." +
this.getClass().getName());
- if (handlerStatus == null) {
- // default value is on.
- eventSwitch.turnOn();
- } else {
- Boolean status = new Boolean(handlerStatus);
- if (status) {
- eventSwitch.turnOn();
- } else {
- eventSwitch.turnOff();
- }
- }
- }
- } catch (Exception e) {
- // nothing to do
- }
- return eventSwitch;
- }
-
-
-}
diff --git
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
index 6702f5689..fe3fe8fb0 100644
---
a/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
+++
b/services/src/main/java/org/apache/unomi/services/impl/rules/RulesServiceImpl.java
@@ -30,9 +30,11 @@ import org.apache.unomi.api.rules.RuleStatistics;
import org.apache.unomi.api.services.*;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.persistence.spi.config.ConfigurationUpdateHelper;
import org.apache.unomi.services.actions.ActionExecutorDispatcher;
import org.apache.unomi.api.utils.ParserHelper;
import org.osgi.framework.*;
+import org.osgi.service.cm.ManagedService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
-public class RulesServiceImpl implements RulesService, EventListenerService,
SynchronousBundleListener {
+public class RulesServiceImpl implements RulesService, EventListenerService,
SynchronousBundleListener, ManagedService {
public static final String TRACKED_PARAMETER =
"trackedConditionParameters";
private static final Logger LOGGER =
LoggerFactory.getLogger(RulesServiceImpl.class.getName());
@@ -105,6 +107,20 @@ public class RulesServiceImpl implements RulesService,
EventListenerService, Syn
this.optimizedRulesActivated = optimizedRulesActivated;
}
+ @Override
+ public void updated(Dictionary<String, ?> properties) {
+ Map<String, ConfigurationUpdateHelper.PropertyMapping>
propertyMappings = new HashMap<>();
+
+ // Boolean properties
+ propertyMappings.put("rules.optimizationActivated",
ConfigurationUpdateHelper.booleanProperty(this::setOptimizedRulesActivated));
+
+ // Integer properties
+ propertyMappings.put("rules.refresh.interval",
ConfigurationUpdateHelper.integerProperty(this::setRulesRefreshInterval));
+ propertyMappings.put("rules.statistics.refresh.interval",
ConfigurationUpdateHelper.integerProperty(this::setRulesStatisticsRefreshInterval));
+
+ ConfigurationUpdateHelper.processConfigurationUpdates(properties,
LOGGER, "Rules service", propertyMappings);
+ }
+
public void postConstruct() {
LOGGER.debug("postConstruct {{}}", bundleContext.getBundle());
diff --git a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f49f3c849..bf64a77d7 100644
--- a/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -23,7 +23,7 @@
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0
http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.1.0.xsd">
<cm:property-placeholder persistent-id="org.apache.unomi.services"
- update-strategy="reload"
placeholder-prefix="${services.">
+ update-strategy="none"
placeholder-prefix="${services.">
<cm:default-properties>
<cm:property name="profile.purge.interval" value="1"/>
<cm:property name="profile.purge.inactiveTime" value="180"/>
@@ -53,7 +53,7 @@
<cm:property-placeholder persistent-id="org.apache.unomi.cluster"
update-strategy="reload"
placeholder-prefix="${cluster.">
<cm:default-properties>
- <cm:property name="group" value="default"/>
+ <cm:property name="nodeId" value="unomi-node-1"/>
<cm:property name="contextserver.publicAddress"
value="https://localhost:9443"/>
<cm:property name="contextserver.internalAddress"
value="http://127.0.0.1:8181"/>
<cm:property name="nodeStatisticsUpdateFrequency" value="10000"/>
@@ -61,7 +61,7 @@
</cm:property-placeholder>
<cm:property-placeholder
persistent-id="org.apache.unomi.persistence.elasticsearch"
- update-strategy="reload"
placeholder-prefix="${es.">
+ update-strategy="none" placeholder-prefix="${es.">
<cm:default-properties>
<cm:property name="aggregateQueryBucketSize" value="5000"/>
</cm:default-properties>
@@ -70,12 +70,10 @@
<reference id="persistenceService"
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
<reference id="httpService" interface="org.osgi.service.http.HttpService"/>
- <reference id="karafCellarClusterManager"
interface="org.apache.karaf.cellar.core.ClusterManager"/>
- <reference id="karafCellarEventProducer"
interface="org.apache.karaf.cellar.core.event.EventProducer"/>
- <reference id="karafCellarGroupManager"
interface="org.apache.karaf.cellar.core.GroupManager"/>
<reference id="osgiConfigurationAdmin"
interface="org.osgi.service.cm.ConfigurationAdmin"/>
<reference id="metricsService"
interface="org.apache.unomi.metrics.MetricsService"/>
<reference id="scriptExecutor"
interface="org.apache.unomi.scripting.ScriptExecutor"/>
+ <reference id="bundleWatcher"
interface="org.apache.unomi.lifecycle.BundleWatcher"/>
<!-- Service definitions -->
@@ -183,7 +181,11 @@
<value>org.apache.unomi.api.services.RulesService</value>
<value>org.apache.unomi.api.services.EventListenerService</value>
<value>org.osgi.framework.SynchronousBundleListener</value>
+ <value>org.osgi.service.cm.ManagedService</value>
</interfaces>
+ <service-properties>
+ <entry key="service.pid" value="org.apache.unomi.services"/>
+ </service-properties>
</service>
<bean id="segmentServiceImpl"
class="org.apache.unomi.services.impl.segments.SegmentServiceImpl"
@@ -259,16 +261,16 @@
<bean id="clusterServiceImpl"
class="org.apache.unomi.services.impl.cluster.ClusterServiceImpl"
init-method="init" destroy-method="destroy">
+ <property name="persistenceService" ref="persistenceService" />
<property name="publicAddress"
value="${cluster.contextserver.publicAddress}"/>
<property name="internalAddress"
value="${cluster.contextserver.internalAddress}"/>
<property name="persistenceService" ref="persistenceService"/>
- <property name="karafCellarClusterManager"
ref="karafCellarClusterManager"/>
- <property name="karafCellarEventProducer"
ref="karafCellarEventProducer"/>
- <property name="karafCellarGroupManager"
ref="karafCellarGroupManager"/>
- <property name="karafCellarGroupName" value="${cluster.group}"/>
- <property name="osgiConfigurationAdmin" ref="osgiConfigurationAdmin"/>
+ <property name="nodeId" value="${cluster.nodeId}"/>
<property name="nodeStatisticsUpdateFrequency"
value="${cluster.nodeStatisticsUpdateFrequency}"/>
+ <property name="bundleWatcher" ref="bundleWatcher"/>
+ <!-- Wait for UNOMI-878 to be available to activate that
<property name="schedulerService" ref="schedulerServiceImpl"/>
+ -->
</bean>
<service id="clusterService" ref="clusterServiceImpl"
interface="org.apache.unomi.api.services.ClusterService"/>
@@ -411,7 +413,6 @@
</bean>
</service>
-
<bean id="configSharingServiceImpl"
class="org.apache.unomi.services.impl.configsharing.ConfigSharingServiceImpl"
destroy-method="preDestroy">
<property name="configProperties">
@@ -429,19 +430,4 @@
</interfaces>
</service>
- <!-- Cluster System Statistics Event Handler -->
- <bean id="clusterSystemStatisticsEventHandler"
-
class="org.apache.unomi.services.impl.cluster.ClusterSystemStatisticsEventHandler"
- init-method="init" destroy-method="destroy">
- <property name="configurationAdmin" ref="osgiConfigurationAdmin"/>
- <property name="clusterManager" ref="karafCellarClusterManager"/>
- <property name="groupManager" ref="karafCellarGroupManager"/>
- <property name="clusterServiceImpl" ref="clusterServiceImpl"/>
- </bean>
- <service ref="clusterSystemStatisticsEventHandler"
interface="org.apache.karaf.cellar.core.event.EventHandler">
- <service-properties>
- <entry key="managed" value="true"/>
- </service-properties>
- </service>
-
</blueprint>
diff --git a/services/src/main/resources/hazelcast.xml
b/services/src/main/resources/hazelcast.xml
deleted file mode 100644
index 2179c92c6..000000000
--- a/services/src/main/resources/hazelcast.xml
+++ /dev/null
@@ -1,222 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config
hazelcast-config-3.4.xsd"
- xmlns="http://www.hazelcast.com/schema/config"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <group>
- <name>${org.apache.unomi.hazelcast.group.name}</name>
- <password>${org.apache.unomi.hazelcast.group.password}</password>
- </group>
- <management-center
enabled="false">http://localhost:8080/mancenter</management-center>
- <properties>
- <property name="hazelcast.jmx">true</property>
- </properties>
- <network>
- <port auto-increment="true"
port-count="100">${org.apache.unomi.hazelcast.network.port}</port>
- <outbound-ports>
- <!--
- Allowed port range when connecting to other nodes.
- 0 or * means use system provided port.
- -->
- <ports>0</ports>
- </outbound-ports>
- <join>
- <multicast enabled="false">
- <multicast-group>224.2.2.3</multicast-group>
- <multicast-port>54327</multicast-port>
- </multicast>
- <tcp-ip enabled="true">
- <members>${org.apache.unomi.hazelcast.tcp-ip.members}</members>
- </tcp-ip>
- <aws enabled="false">
- <access-key>my-access-key</access-key>
- <secret-key>my-secret-key</secret-key>
- <!--optional, default is us-east-1 -->
- <region>us-west-1</region>
- <!--optional, default is ec2.amazonaws.com. If set, region
shouldn't be set as it will override this property -->
- <host-header>ec2.amazonaws.com</host-header>
- <!-- optional, only instances belonging to this group will be
discovered, default will try all running instances -->
- <security-group-name>hazelcast-sg</security-group-name>
- <tag-key>type</tag-key>
- <tag-value>hz-nodes</tag-value>
- </aws>
- </join>
- <interfaces enabled="false">
- <interface>10.10.1.*</interface>
- </interfaces>
- <ssl enabled="false"/>
- <socket-interceptor enabled="false"/>
- <symmetric-encryption enabled="false">
- <!--
- encryption algorithm such as
- DES/ECB/PKCS5Padding,
- PBEWithMD5AndDES,
- AES/CBC/PKCS5Padding,
- Blowfish,
- DESede
- -->
- <algorithm>PBEWithMD5AndDES</algorithm>
- <!-- salt value to use when generating the secret key -->
- <salt>thesalt</salt>
- <!-- pass phrase to use when generating the secret key -->
- <password>thepass</password>
- <!-- iteration count to use when generating the secret key -->
- <iteration-count>19</iteration-count>
- </symmetric-encryption>
- </network>
- <partition-group enabled="false"/>
- <executor-service>
- <pool-size>16</pool-size>
- <!-- Queue capacity. 0 means Integer.MAX_VALUE -->
- <queue-capacity>0</queue-capacity>
- </executor-service>
- <queue name="default">
- <!--
- Maximum size of the queue. When a JVM's local queue size reaches
the maximum,
- all put/offer operations will get blocked until the queue size
- of the JVM goes down below the maximum.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size>0</max-size>
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
- <empty-queue-ttl>-1</empty-queue-ttl>
- </queue>
-
- <map name="default">
- <!--
- Data type that will be used for storing recordMap.
- Possible values:
- BINARY (default): keys and values will be stored as binary data
- OBJECT : values will be stored in their object forms
- OFFHEAP : values will be stored in non-heap region of JVM
- -->
- <in-memory-format>BINARY</in-memory-format>
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
- <!--
- Maximum number of seconds for each entry to stay in the map.
Entries that are
- older than <time-to-live-seconds> and not updated for
<time-to-live-seconds>
- will get automatically evicted from the map.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite.
Default is 0.
- -->
- <time-to-live-seconds>0</time-to-live-seconds>
- <!--
- Maximum number of seconds for each entry to stay idle in the map.
Entries that are
- idle(not touched) for more than <max-idle-seconds> will get
- automatically evicted from the map. Entry is touched if get, put
or containsKey is called.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite.
Default is 0.
- -->
- <max-idle-seconds>0</max-idle-seconds>
- <!--
- Valid values are:
- NONE (no eviction),
- LRU (Least Recently Used),
- LFU (Least Frequently Used).
- NONE is the default.
- -->
- <eviction-policy>NONE</eviction-policy>
- <!--
- Maximum size of the map. When max size is reached,
- map is evicted based on the policy defined.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size policy="PER_NODE">0</max-size>
- <!--
- When max. size is reached, specified percentage of
- the map will be evicted. Any integer between 0 and 100.
- If 25 is set for example, 25% of the entries will
- get evicted.
- -->
- <eviction-percentage>25</eviction-percentage>
- <!--
- While recovering from split-brain (network partitioning),
- map entries in the small cluster will merge into the bigger cluster
- based on the policy set here. When an entry merge into the
- cluster, there might an existing entry with the same key already.
- Values of these entries might be different for that same key.
- Which value should be set for the key? Conflict is resolved by
- the policy set here. Default policy is PutIfAbsentMapMergePolicy
-
- There are built-in merge policies such as
- com.hazelcast.map.merge.PassThroughMergePolicy; entry will be
added if there is no existing entry for the key.
- com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be
added if the merging entry doesn't exist in the cluster.
- com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the
higher hits wins.
- com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with
the latest update wins.
- -->
-
<merge-policy>com.hazelcast.map.merge.PassThroughMergePolicy</merge-policy>
- </map>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <list name="default">
- <backup-count>1</backup-count>
- </list>
-
- <set name="default">
- <backup-count>1</backup-count>
- </set>
-
- <jobtracker name="default">
- <max-thread-size>0</max-thread-size>
- <!-- Queue size 0 means number of partitions * 2 -->
- <queue-size>0</queue-size>
- <retry-count>0</retry-count>
- <chunk-size>1000</chunk-size>
- <communicate-stats>true</communicate-stats>
-
<topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
- </jobtracker>
-
- <semaphore name="default">
- <initial-permits>0</initial-permits>
- <backup-count>1</backup-count>
- <async-backup-count>0</async-backup-count>
- </semaphore>
-
- <serialization>
- <portable-version>0</portable-version>
- </serialization>
-
- <services enable-defaults="true" />
-</hazelcast>
\ No newline at end of file
diff --git a/services/src/main/resources/org.apache.unomi.cluster.cfg
b/services/src/main/resources/org.apache.unomi.cluster.cfg
index bfbb189d9..eecb7e1de 100644
--- a/services/src/main/resources/org.apache.unomi.cluster.cfg
+++ b/services/src/main/resources/org.apache.unomi.cluster.cfg
@@ -14,14 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-group=${org.apache.unomi.cluster.group:-default}
# To simplify testing we set the public address to use HTTP, but for
production environments it is highly recommended
# to switch to using HTTPS with a proper SSL certificate installed.
contextserver.publicAddress=${org.apache.unomi.cluster.public.address:-http://localhost:8181}
contextserver.internalAddress=${org.apache.unomi.cluster.internal.address:-https://localhost:9443}
#
-# The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
+# The nodeId is a required setting that uniquely identifies this node in the
cluster.
+# It must be set to a unique value for each node in the cluster.
+# Example: nodeId=node1
+nodeId=${org.apache.unomi.cluster.nodeId:-unomi-node-1}
+#
+## The nodeStatisticsUpdateFrequency controls the frequency of the update of
system statistics such as CPU load,
# system load average and uptime. This value is set in milliseconds and is set
to 10 seconds by default. Each node
# will retrieve the local values and broadcast them through a cluster event to
all the other nodes to update
# the global cluster statistics.
-nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}
\ No newline at end of file
+nodeStatisticsUpdateFrequency=${org.apache.unomi.cluster.nodeStatisticsUpdateFrequency:-10000}