Modified: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java (original) +++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java Tue Oct 20 14:12:31 2015 @@ -19,25 +19,17 @@ package org.apache.sling.discovery.commons.providers.spi.impl; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; -import java.lang.reflect.Method; import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import javax.jcr.Repository; -import javax.jcr.Session; - -import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore; -import org.apache.jackrabbit.oak.util.GenericDescriptors; -import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; -import org.apache.sling.commons.json.JSONException; +import org.apache.sling.discovery.commons.providers.DummyTopologyView; import org.apache.sling.discovery.commons.providers.ViewStateManager; -import org.apache.sling.discovery.commons.providers.impl.Listener; -import org.apache.sling.discovery.commons.providers.impl.SimpleTopologyView; +import org.apache.sling.discovery.commons.providers.impl.DummyListener; import org.apache.sling.discovery.commons.providers.impl.TestHelper; import org.apache.sling.discovery.commons.providers.impl.ViewStateManagerFactory; import org.apache.sling.jcr.api.SlingRepository; @@ -47,46 +39,86 @@ import org.junit.Test; public class TestOakSyncTokenConsistencyService { + private static final String SYNCTOKEN_PATH = "/var/discovery/commons/synctokens"; + + private static final String IDMAP_PATH = "/var/discovery/commons/idmap"; + + public final class SimpleCommonsConfig implements DiscoveryLiteConfig { + + private long bgIntervalMillis; + private long bgTimeoutMillis; + + SimpleCommonsConfig() { + this(1000, -1); // defaults + } + + SimpleCommonsConfig(long bgIntervalMillis, long bgTimeoutMillis) { + this.bgIntervalMillis = bgIntervalMillis; + this.bgTimeoutMillis = bgTimeoutMillis; + } + + @Override + public String getSyncTokenPath() { + return SYNCTOKEN_PATH; + } + + @Override + public String getIdMapPath() { + return IDMAP_PATH; + } + + @Override + public long getBgTimeoutMillis() { + return bgTimeoutMillis; + } + + @Override + public long getBgIntervalMillis() { + return bgIntervalMillis; + } + + } + ResourceResolverFactory factory1; ResourceResolverFactory factory2; private SlingRepository repository1; private SlingRepository repository2; private MemoryNodeStore memoryNS; + private IdMapService idMapService1; + private String slingId1; @Before public void setup() throws Exception { - MockFactory.resetRepo(); + RepositoryTestHelper.resetRepo(); memoryNS = new MemoryNodeStore(); - repository1 = RepositoryHelper.newOakRepository(memoryNS); -// repository1 = MultipleRepositoriesSupport.newRepository("target/repo1"); - RepositoryHelper.initSlingNodeTypes(repository1); - repository2 = RepositoryHelper.newOakRepository(memoryNS); -// repository2 = MultipleRepositoriesSupport.newRepository("target/repo2"); -// MultipleRepositoriesSupport.initSlingNodeTypes(repository2); - factory1 = MockFactory.mockResourceResolverFactory(repository1); - factory2 = MockFactory.mockResourceResolverFactory(repository2); + repository1 = RepositoryTestHelper.newOakRepository(memoryNS); + RepositoryTestHelper.initSlingNodeTypes(repository1); + repository2 = RepositoryTestHelper.newOakRepository(memoryNS); + factory1 = RepositoryTestHelper.mockResourceResolverFactory(repository1); + factory2 = RepositoryTestHelper.mockResourceResolverFactory(repository2); + slingId1 = UUID.randomUUID().toString(); + idMapService1 = IdMapService.testConstructor(new SimpleCommonsConfig(), new DummySlingSettingsService(slingId1), factory1); } @After public void tearDown() throws Exception { if (repository1!=null) { - RepositoryHelper.stopRepository(repository1); + RepositoryTestHelper.stopRepository(repository1); repository1 = null; } if (repository2!=null) { - RepositoryHelper.stopRepository(repository2); + RepositoryTestHelper.stopRepository(repository2); repository2 = null; } } @Test public void testOneNode() throws Exception { - String slingId1 = UUID.randomUUID().toString(); - SimpleTopologyView one = TestHelper.newView(true, slingId1, slingId1, slingId1); + DummyTopologyView one = TestHelper.newView(true, slingId1, slingId1, slingId1); Lock lock = new ReentrantLock(); - OakSyncTokenConsistencyService cs = new OakSyncTokenConsistencyService(factory1, slingId1, -1, -1); + OakSyncTokenConsistencyService cs = OakSyncTokenConsistencyService.testConstructorAndActivate(new SimpleCommonsConfig(), idMapService1, new DummySlingSettingsService(slingId1), factory1); ViewStateManager vsm = ViewStateManagerFactory.newViewStateManager(lock, cs); - Listener l = new Listener(); + DummyListener l = new DummyListener(); assertEquals(0, l.countEvents()); vsm.bind(l); cs.triggerBackgroundCheck(); @@ -98,106 +130,67 @@ public class TestOakSyncTokenConsistency cs.triggerBackgroundCheck(); assertEquals(0, l.countEvents()); cs.triggerBackgroundCheck(); - setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(1).activeIds(1)); + DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().me(1).seq(1).activeIds(1).setFinal(true)); + assertTrue(idMapService1.waitForInit(2000)); cs.triggerBackgroundCheck(); + assertTrue(vsm.waitForAsyncEvents(1000)); assertEquals(1, l.countEvents()); } @Test public void testTwoNodesOneLeaving() throws Exception { - String slingId1 = UUID.randomUUID().toString(); String slingId2 = UUID.randomUUID().toString(); - SimpleTopologyView two1 = TestHelper.newView(true, slingId1, slingId1, slingId1, slingId2); + DummyTopologyView two1 = TestHelper.newView(true, slingId1, slingId1, slingId1, slingId2); Lock lock1 = new ReentrantLock(); - OakSyncTokenConsistencyService cs1 = new OakSyncTokenConsistencyService(factory1, slingId1, -1, -1); + OakSyncTokenConsistencyService cs1 = OakSyncTokenConsistencyService.testConstructorAndActivate(new SimpleCommonsConfig(), idMapService1, new DummySlingSettingsService(slingId1), factory1); ViewStateManager vsm1 = ViewStateManagerFactory.newViewStateManager(lock1, cs1); - Listener l = new Listener(); + DummyListener l = new DummyListener(); vsm1.bind(l); vsm1.handleActivated(); vsm1.handleNewView(two1); cs1.triggerBackgroundCheck(); assertEquals(0, l.countEvents()); - setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(1).activeIds(1).deactivatingIds(2)); + DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(1).activeIds(1).deactivatingIds(2)); cs1.triggerBackgroundCheck(); assertEquals(0, l.countEvents()); - setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(2).activeIds(1)); + DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(2).activeIds(1)); cs1.triggerBackgroundCheck(); Lock lock2 = new ReentrantLock(); - OakSyncTokenConsistencyService cs2 = new OakSyncTokenConsistencyService(factory2, slingId2, -1, -1); + IdMapService idMapService2 = IdMapService.testConstructor( + new SimpleCommonsConfig(), new DummySlingSettingsService(slingId2), factory2); + OakSyncTokenConsistencyService cs2 = OakSyncTokenConsistencyService.testConstructorAndActivate(new SimpleCommonsConfig(), idMapService2, new DummySlingSettingsService(slingId2), factory2); ViewStateManager vsm2 = ViewStateManagerFactory.newViewStateManager(lock2, cs2); cs1.triggerBackgroundCheck(); cs2.triggerBackgroundCheck(); assertEquals(0, l.countEvents()); - setDiscoveryLiteDescriptor(factory2, new DiscoLite().me(2).seq(3).activeIds(1, 2)); + DescriptorHelper.setDiscoveryLiteDescriptor(factory2, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(2).seq(3).activeIds(1, 2)); cs1.triggerBackgroundCheck(); cs2.triggerBackgroundCheck(); assertEquals(0, l.countEvents()); - setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(3).activeIds(1, 2)); + DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(3).activeIds(1, 2)); cs1.triggerBackgroundCheck(); cs2.triggerBackgroundCheck(); assertEquals(0, l.countEvents()); vsm2.handleActivated(); - SimpleTopologyView two2 = TestHelper.newView(two1.getLocalClusterSyncTokenId(), two1.getLocalInstance().getClusterView().getId(), true, slingId1, slingId1, slingId1, slingId2); + assertTrue(idMapService1.waitForInit(2000)); + assertTrue(idMapService2.waitForInit(2000)); + DummyTopologyView two2 = TestHelper.newView(two1.getLocalClusterSyncTokenId(), two1.getLocalInstance().getClusterView().getId(), true, slingId1, slingId1, slingId1, slingId2); vsm2.handleNewView(two2); cs1.triggerBackgroundCheck(); cs2.triggerBackgroundCheck(); assertEquals(1, l.countEvents()); - SimpleTopologyView oneLeaving = two1.clone(); + DummyTopologyView oneLeaving = two1.clone(); oneLeaving.removeInstance(slingId2); - setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(1).activeIds(1).deactivatingIds(2)); + DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(1).activeIds(1).deactivatingIds(2)); vsm1.handleNewView(oneLeaving); cs1.triggerBackgroundCheck(); cs2.triggerBackgroundCheck(); assertEquals(2, l.countEvents()); - setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(2).activeIds(1).inactiveIds(2)); + DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(2).activeIds(1).inactiveIds(2)); cs1.triggerBackgroundCheck(); cs2.triggerBackgroundCheck(); - RepositoryHelper.dumpRepo(factory1); + RepositoryTestHelper.dumpRepo(factory1); assertEquals(3, l.countEvents()); } - private void setDiscoveryLiteDescriptor(ResourceResolverFactory factory, DiscoLite builder) throws JSONException, Exception { - setDescriptor(factory, OakSyncTokenConsistencyService.OAK_DISCOVERYLITE_CLUSTERVIEW, builder.asJson()); - } - - private void setDescriptor(ResourceResolverFactory factory, String key, - String value) throws Exception { - ResourceResolver resourceResolver = factory.getAdministrativeResourceResolver(null); - try{ - Session session = resourceResolver.adaptTo(Session.class); - if (session == null) { - return; - } - Repository repo = session.getRepository(); - - //<hack> -// Method setDescriptorMethod = repo.getClass(). -// getDeclaredMethod("setDescriptor", String.class, String.class); -// if (setDescriptorMethod!=null) { -// setDescriptorMethod.setAccessible(true); -// setDescriptorMethod.invoke(repo, key, value); -// } else { -// fail("could not get 'setDescriptor' method"); -// } - Method getDescriptorsMethod = repo.getClass().getDeclaredMethod("getDescriptors"); - if (getDescriptorsMethod==null) { - fail("could not get 'getDescriptors' method"); - } else { - getDescriptorsMethod.setAccessible(true); - GenericDescriptors descriptors = (GenericDescriptors) getDescriptorsMethod.invoke(repo); - SimpleValueFactory valueFactory = new SimpleValueFactory(); - descriptors.put(key, valueFactory.createValue(value), true, true); - } - //</hack> - - //<verify-hack> - assertEquals(value, repo.getDescriptor(key)); - //</verify-hack> - } finally { - if (resourceResolver!=null) { - resourceResolver.close(); - } - } - } - }
Modified: sling/trunk/bundles/extensions/discovery/impl/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/pom.xml?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/pom.xml (original) +++ sling/trunk/bundles/extensions/discovery/impl/pom.xml Tue Oct 20 14:12:31 2015 @@ -132,6 +132,32 @@ <version>1.0.0-SNAPSHOT</version> <scope>provided</scope> </dependency> + <!-- besides including discovery.commons' normal jar above, + for testing a few test helper classes are also reused. + in order to achieve that, also adding a test/test-jar dependency: --> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.discovery.commons</artifactId> + <version>1.0.0-SNAPSHOT</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.discovery.base</artifactId> + <version>1.0.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <!-- besides including discovery.base' normal jar above, + for testing a few test helper classes are also reused. + in order to achieve that, also adding a test/test-jar dependency: --> + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.discovery.base</artifactId> + <version>1.0.0-SNAPSHOT</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.api</artifactId> Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java Tue Oct 20 14:12:31 2015 @@ -29,6 +29,7 @@ import org.apache.felix.scr.annotations. import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Service; import org.apache.sling.commons.osgi.PropertiesUtil; +import org.apache.sling.discovery.base.connectors.BaseConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory; */ @Component(metatype = true, label="%config.name", description="%config.description") @Service(value = { Config.class }) -public class Config { +public class Config implements BaseConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -339,7 +340,7 @@ public class Config { * Returns the socket connect() timeout used by the topology connector, 0 disables the timeout * @return the socket connect() timeout used by the topology connector, 0 disables the timeout */ - public int getConnectionTimeout() { + public int getSocketConnectionTimeout() { return connectionTimeout; } @@ -387,12 +388,16 @@ public class Config { return topologyConnectorWhitelist; } + protected String getDiscoveryResourcePath() { + return discoveryResourcePath; + } + /** * Returns the resource path where cluster instance informations are stored. * @return the resource path where cluster instance informations are stored */ public String getClusterInstancesPath() { - return discoveryResourcePath + CLUSTERINSTANCES_RESOURCE; + return getDiscoveryResourcePath() + CLUSTERINSTANCES_RESOURCE; } /** @@ -400,7 +405,7 @@ public class Config { * @return the resource path where the established view is stored */ public String getEstablishedViewPath() { - return discoveryResourcePath + ESTABLISHED_VIEW_RESOURCE; + return getDiscoveryResourcePath() + ESTABLISHED_VIEW_RESOURCE; } /** @@ -408,7 +413,7 @@ public class Config { * @return the resource path where ongoing votings are stored */ public String getOngoingVotingsPath() { - return discoveryResourcePath + ONGOING_VOTING_RESOURCE; + return getDiscoveryResourcePath() + ONGOING_VOTING_RESOURCE; } /** @@ -416,7 +421,7 @@ public class Config { * @return the resource path where the previous view is stored */ public String getPreviousViewPath() { - return discoveryResourcePath + PREVIOUS_VIEW_RESOURCE; + return getDiscoveryResourcePath() + PREVIOUS_VIEW_RESOURCE; } /** @@ -515,4 +520,15 @@ public class Config { return factor * getHeartbeatInterval(); } } + + @Override + public long getConnectorPingInterval() { + return getHeartbeatInterval(); + } + + @Override + public long getConnectorPingTimeout() { + return getHeartbeatTimeout(); + } + } Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java Tue Oct 20 14:12:31 2015 @@ -47,26 +47,25 @@ import org.apache.sling.api.resource.Res import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.commons.scheduler.Scheduler; -import org.apache.sling.discovery.ClusterView; import org.apache.sling.discovery.DiscoveryService; import org.apache.sling.discovery.InstanceDescription; import org.apache.sling.discovery.PropertyProvider; import org.apache.sling.discovery.TopologyEventListener; -import org.apache.sling.discovery.TopologyView; +import org.apache.sling.discovery.base.commons.BaseDiscoveryService; +import org.apache.sling.discovery.base.commons.ClusterViewService; +import org.apache.sling.discovery.base.commons.DefaultTopologyView; +import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry; +import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry; import org.apache.sling.discovery.commons.providers.BaseTopologyView; +import org.apache.sling.discovery.commons.providers.DefaultClusterView; +import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription; import org.apache.sling.discovery.commons.providers.ViewStateManager; import org.apache.sling.discovery.commons.providers.impl.ViewStateManagerFactory; import org.apache.sling.discovery.commons.providers.spi.ConsistencyService; -import org.apache.sling.discovery.impl.cluster.ClusterViewService; -import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException; -import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException.Reason; -import org.apache.sling.discovery.impl.common.DefaultClusterViewImpl; -import org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl; +import org.apache.sling.discovery.commons.providers.spi.impl.IdMapService; +import org.apache.sling.discovery.commons.providers.util.PropertyNameHelper; +import org.apache.sling.discovery.commons.providers.util.ResourceHelper; import org.apache.sling.discovery.impl.common.heartbeat.HeartbeatHandler; -import org.apache.sling.discovery.impl.common.resource.ResourceHelper; -import org.apache.sling.discovery.impl.topology.TopologyViewImpl; -import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry; -import org.apache.sling.discovery.impl.topology.connector.ConnectorRegistry; import org.apache.sling.settings.SlingSettingsService; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; @@ -81,7 +80,7 @@ import org.slf4j.LoggerFactory; */ @Component(immediate = true) @Service(value = { DiscoveryService.class, DiscoveryServiceImpl.class }) -public class DiscoveryServiceImpl implements DiscoveryService { +public class DiscoveryServiceImpl extends BaseDiscoveryService { private final static Logger logger = LoggerFactory.getLogger(DiscoveryServiceImpl.class); @@ -126,19 +125,40 @@ public class DiscoveryServiceImpl implem @Reference private Config config; + + @Reference + private IdMapService idMapService; /** the slingId of the local instance **/ private String slingId; - /** the old view previously valid and sent to the TopologyEventListeners **/ - private TopologyViewImpl oldView; - private ServiceRegistration mbeanRegistration; private ViewStateManager viewStateManager; private ReentrantLock viewStateManagerLock; + /** for testing only **/ + public static BaseDiscoveryService testConstructor(ResourceResolverFactory resourceResolverFactory, + AnnouncementRegistry announcementRegistry, + ConnectorRegistry connectorRegistry, + ClusterViewService clusterViewService, + HeartbeatHandler heartbeatHandler, + SlingSettingsService settingsService, + Scheduler scheduler, + Config config) { + DiscoveryServiceImpl discoService = new DiscoveryServiceImpl(); + discoService.resourceResolverFactory = resourceResolverFactory; + discoService.announcementRegistry = announcementRegistry; + discoService.connectorRegistry = connectorRegistry; + discoService.clusterViewService = clusterViewService; + discoService.heartbeatHandler = heartbeatHandler; + discoService.settingsService = settingsService; + discoService.scheduler = scheduler; + discoService.config = config; + return discoService; + } + public DiscoveryServiceImpl() { viewStateManagerLock = new ReentrantLock(); final ConsistencyService consistencyService = new ConsistencyService() { @@ -174,12 +194,23 @@ public class DiscoveryServiceImpl implem } } - private void setOldView(TopologyViewImpl view) { - if (view==null) { - throw new IllegalArgumentException("view must not be null"); + protected void handleIsolatedFromTopology() { + if (heartbeatHandler!=null) { + // SLING-5030 part 2: when we detect being isolated we should + // step at the end of the leader-election queue and + // that can be achieved by resetting the leaderElectionId + // (which will in turn take effect on the next round of + // voting, or also double-checked when the local instance votes) + // + //TODO: + // Note that when the local instance doesn't notice + // an 'ISOLATED_FROM_TOPOLOGY' case, then the leaderElectionId + // will not be reset. Which means that it then could potentially + // regain leadership. + if (heartbeatHandler.resetLeaderElectionId()) { + logger.info("getTopology: reset leaderElectionId to force this instance to the end of the instance order (thus incl not to remain leader)"); + } } - logger.debug("setOldView: oldView is now: {}", oldView); - oldView = view; } /** @@ -209,19 +240,19 @@ public class DiscoveryServiceImpl implem // this way for the single-instance case the clusterId can // remain the same between a getTopology() that is invoked before // the first TOPOLOGY_INIT and afterwards - DefaultClusterViewImpl isolatedCluster = new DefaultClusterViewImpl(isolatedClusterId); + DefaultClusterView isolatedCluster = new DefaultClusterView(isolatedClusterId); Map<String, String> emptyProperties = new HashMap<String, String>(); - DefaultInstanceDescriptionImpl isolatedInstance = - new DefaultInstanceDescriptionImpl(isolatedCluster, true, true, slingId, emptyProperties); + DefaultInstanceDescription isolatedInstance = + new DefaultInstanceDescription(isolatedCluster, true, true, slingId, emptyProperties); Collection<InstanceDescription> col = new ArrayList<InstanceDescription>(); col.add(isolatedInstance); - final TopologyViewImpl topology = new TopologyViewImpl(); + final DefaultTopologyView topology = new DefaultTopologyView(); topology.addInstances(col); topology.setNotCurrent(); setOldView(topology); } - setOldView((TopologyViewImpl) getTopology()); - oldView.setNotCurrent(); + setOldView((DefaultTopologyView) getTopology()); + getOldView().setNotCurrent(); // make sure the first heartbeat is issued as soon as possible - which // is right after this service starts. since the two (discoveryservice @@ -235,7 +266,7 @@ public class DiscoveryServiceImpl implem doUpdateProperties(); - TopologyViewImpl newView = (TopologyViewImpl) getTopology(); + DefaultTopologyView newView = (DefaultTopologyView) getTopology(); if (newView.isCurrent()) { viewStateManager.handleNewView(newView); } else { @@ -468,58 +499,6 @@ public class DiscoveryServiceImpl implem } /** - * @see DiscoveryService#getTopology() - */ - public TopologyView getTopology() { - if (clusterViewService == null) { - throw new IllegalStateException( - "DiscoveryService not yet initialized with IClusterViewService"); - } - // create a new topology view - final TopologyViewImpl topology = new TopologyViewImpl(); - - ClusterView localClusterView = null; - try { - localClusterView = clusterViewService.getClusterView(); - } catch (UndefinedClusterViewException e) { - // SLING-5030 : when we're cut off from the local cluster we also - // treat it as being cut off from the entire topology, ie we don't - // update the announcements but just return - // the previous oldView marked as !current - logger.info("getTopology: undefined cluster view: "+e.getReason()+"] "+e); - oldView.setNotCurrent(); - if (e.getReason()==Reason.ISOLATED_FROM_TOPOLOGY) { - if (heartbeatHandler!=null) { - // SLING-5030 part 2: when we detect being isolated we should - // step at the end of the leader-election queue and - // that can be achieved by resetting the leaderElectionId - // (which will in turn take effect on the next round of - // voting, or also double-checked when the local instance votes) - // - //TODO: - // Note that when the local instance doesn't notice - // an 'ISOLATED_FROM_TOPOLOGY' case, then the leaderElectionId - // will not be reset. Which means that it then could potentially - // regain leadership. - if (heartbeatHandler.resetLeaderElectionId()) { - logger.info("getTopology: reset leaderElectionId to force this instance to the end of the instance order (thus incl not to remain leader)"); - } - } - } - return oldView; - } - - final List<InstanceDescription> localInstances = localClusterView.getInstances(); - topology.addInstances(localInstances); - - Collection<InstanceDescription> attachedInstances = announcementRegistry - .listInstances(localClusterView); - topology.addInstances(attachedInstances); - - return topology; - } - - /** * Update the properties and sent a topology event if applicable */ public void updateProperties() { @@ -576,7 +555,7 @@ public class DiscoveryServiceImpl implem /** SLING-2883 : put property only if valid **/ private void putPropertyIfValid(final String name, final String val) { - if (ResourceHelper.isValidPropertyName(name)) { + if (PropertyNameHelper.isValidPropertyName(name)) { this.properties.put(name, val); } } @@ -640,18 +619,26 @@ public class DiscoveryServiceImpl implem logger.error("forcedShutdown: ignoring forced shutdown. Service is not activated."); return; } - if (oldView == null) { + if (getOldView() == null) { logger.error("forcedShutdown: ignoring forced shutdown. No oldView available."); return; } logger.error("forcedShutdown: sending TOPOLOGY_CHANGING to all listeners"); // SLING-4638: make sure the oldView is really marked as old: - oldView.setNotCurrent(); + getOldView().setNotCurrent(); viewStateManager.handleChanging(); logger.error("forcedShutdown: deactivating DiscoveryService."); // to make sure no further event is sent after this, flag this service as deactivated activated = false; } } + + protected ClusterViewService getClusterViewService() { + return clusterViewService; + } + + protected AnnouncementRegistry getAnnouncementRegistry() { + return announcementRegistry; + } } Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java Tue Oct 20 14:12:31 2015 @@ -52,14 +52,14 @@ import org.apache.sling.discovery.Instan import org.apache.sling.discovery.InstanceFilter; import org.apache.sling.discovery.TopologyEvent; import org.apache.sling.discovery.TopologyEvent.Type; +import org.apache.sling.discovery.base.commons.ClusterViewService; +import org.apache.sling.discovery.base.connectors.announcement.Announcement; +import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry; +import org.apache.sling.discovery.base.connectors.announcement.CachedAnnouncement; +import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry; +import org.apache.sling.discovery.base.connectors.ping.TopologyConnectorClientInformation; import org.apache.sling.discovery.TopologyEventListener; import org.apache.sling.discovery.TopologyView; -import org.apache.sling.discovery.impl.cluster.ClusterViewService; -import org.apache.sling.discovery.impl.topology.announcement.Announcement; -import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry; -import org.apache.sling.discovery.impl.topology.announcement.CachedAnnouncement; -import org.apache.sling.discovery.impl.topology.connector.ConnectorRegistry; -import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation; import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -430,8 +430,8 @@ public class TopologyWebConsolePlugin ex pw.println("<td><b>not connected</b></td>"); pw.println("<td"+tooltip+"><b>not ok (HTTP Status-Code: "+statusCode+", "+statusDetails+")</b></td>"); } - pw.println("<td>"+beautifiedTimeDiff(topologyConnectorClient.getLastHeartbeatSent())+"</td>"); - pw.println("<td>"+beautifiedDueTime(topologyConnectorClient.getNextHeartbeatDue())+"</td>"); + pw.println("<td>"+beautifiedTimeDiff(topologyConnectorClient.getLastPingSent())+"</td>"); + pw.println("<td>"+beautifiedDueTime(topologyConnectorClient.getNextPingDue())+"</td>"); pw.println("<td>"+topologyConnectorClient.getLastRequestEncoding()+"</td>"); pw.println("<td>"+topologyConnectorClient.getLastResponseEncoding()+"</td>"); // //TODO fallback urls are not yet implemented! @@ -525,7 +525,7 @@ public class TopologyWebConsolePlugin ex } else { pw.println("<td><i>n/a</i></td>"); } - pw.println("<td>"+beautifiedTimeDiff(incomingCachedAnnouncement.getLastHeartbeat())+"</td>"); + pw.println("<td>"+beautifiedTimeDiff(incomingCachedAnnouncement.getLastPing())+"</td>"); pw.println("<td>"+beautifiedDueTime(incomingCachedAnnouncement.getSecondsUntilTimeout())+"</td>"); pw.println("</tr>"); @@ -784,7 +784,7 @@ public class TopologyWebConsolePlugin ex pw.print(incomingAnnouncement.getServerInfo()); pw.println(); } - pw.println("Last heartbeat received : "+beautifiedTimeDiff(incomingCachedAnnouncement.getLastHeartbeat())); + pw.println("Last heartbeat received : "+beautifiedTimeDiff(incomingCachedAnnouncement.getLastPing())); pw.println("Timeout : "+beautifiedDueTime(incomingCachedAnnouncement.getSecondsUntilTimeout())); pw.println(); @@ -845,8 +845,8 @@ public class TopologyWebConsolePlugin ex } pw.print(" (HTTP StatusCode: "+statusCode+", "+statusDetails+")"); pw.println(); - pw.println("Last heartbeat sent : "+beautifiedTimeDiff(topologyConnectorClient.getLastHeartbeatSent())); - pw.println("Next heartbeat due : "+beautifiedDueTime(topologyConnectorClient.getNextHeartbeatDue())); + pw.println("Last heartbeat sent : "+beautifiedTimeDiff(topologyConnectorClient.getLastPingSent())); + pw.println("Next heartbeat due : "+beautifiedDueTime(topologyConnectorClient.getNextPingDue())); } pw.println(); } Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java Tue Oct 20 14:12:31 2015 @@ -18,20 +18,18 @@ */ package org.apache.sling.discovery.impl.cluster; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Service; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; -import org.apache.sling.discovery.ClusterView; import org.apache.sling.discovery.InstanceDescription; +import org.apache.sling.discovery.base.commons.ClusterViewService; +import org.apache.sling.discovery.base.commons.UndefinedClusterViewException; +import org.apache.sling.discovery.base.commons.UndefinedClusterViewException.Reason; +import org.apache.sling.discovery.commons.providers.spi.LocalClusterView; import org.apache.sling.discovery.impl.Config; -import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException.Reason; import org.apache.sling.discovery.impl.common.View; import org.apache.sling.discovery.impl.common.ViewHelper; import org.apache.sling.discovery.impl.common.resource.EstablishedClusterView; @@ -60,6 +58,15 @@ public class ClusterViewServiceImpl impl @Reference private Config config; + public static ClusterViewService testConstructor(SlingSettingsService settingsService, + ResourceResolverFactory factory, Config config) { + ClusterViewServiceImpl service = new ClusterViewServiceImpl(); + service.settingsService = settingsService; + service.resourceResolverFactory = factory; + service.config = config; + return service; + } + public String getSlingId() { if (settingsService==null) { return null; @@ -67,32 +74,7 @@ public class ClusterViewServiceImpl impl return settingsService.getSlingId(); } - public boolean contains(final String slingId) throws UndefinedClusterViewException { - List<InstanceDescription> localInstances = getClusterView().getInstances(); - for (Iterator<InstanceDescription> it = localInstances.iterator(); it - .hasNext();) { - InstanceDescription aLocalInstance = it.next(); - if (aLocalInstance.getSlingId().equals(slingId)) { - return true; - } - } - - return false; - } - - public boolean containsAny(Collection<InstanceDescription> listInstances) - throws UndefinedClusterViewException{ - for (Iterator<InstanceDescription> it = listInstances.iterator(); it - .hasNext();) { - InstanceDescription instanceDescription = it.next(); - if (contains(instanceDescription.getSlingId())) { - return true; - } - } - return false; - } - - public ClusterView getClusterView() throws UndefinedClusterViewException { + public LocalClusterView getLocalClusterView() throws UndefinedClusterViewException { if (resourceResolverFactory==null) { logger.warn("getClusterView: no resourceResolverFactory set at the moment."); throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION, @@ -112,16 +94,9 @@ public class ClusterViewServiceImpl impl EstablishedClusterView clusterViewImpl = new EstablishedClusterView( config, view, getSlingId()); - boolean foundLocal = false; - for (Iterator<InstanceDescription> it = clusterViewImpl - .getInstances().iterator(); it.hasNext();) { - InstanceDescription instance = it.next(); - if (instance.isLocal()) { - foundLocal = true; - break; - } - } - if (foundLocal) { + + InstanceDescription local = clusterViewImpl.getLocalInstance(); + if (local != null) { return clusterViewImpl; } else { logger.info("getClusterView: the local instance ("+getSlingId()+") is currently not included in the existing established view! " Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java Tue Oct 20 14:12:31 2015 @@ -38,8 +38,8 @@ import org.apache.sling.api.resource.Res import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.api.resource.ValueMap; +import org.apache.sling.discovery.commons.providers.util.ResourceHelper; import org.apache.sling.discovery.impl.Config; -import org.apache.sling.discovery.impl.common.resource.ResourceHelper; import org.apache.sling.settings.SlingSettingsService; import org.osgi.framework.Constants; import org.osgi.service.component.ComponentContext; @@ -84,6 +84,16 @@ public class VotingHandler implements Ev * to ensure the leaderElectionId is correctly set upon voting */ private volatile String leaderElectionId; + + /** for testing only **/ + public static VotingHandler testConstructor(SlingSettingsService settingsService, + ResourceResolverFactory factory, Config config) { + VotingHandler handler = new VotingHandler(); + handler.slingSettingsService = settingsService; + handler.resolverFactory = factory; + handler.config = config; + return handler; + } protected void activate(final ComponentContext context) { slingId = slingSettingsService.getSlingId(); @@ -140,6 +150,7 @@ public class VotingHandler implements Ev */ public synchronized void analyzeVotings(final ResourceResolver resourceResolver) throws PersistenceException { // SLING-3406: refreshing resourceResolver/session here to get the latest state from the repository + logger.debug("analyzeVotings: start. slingId: {}", slingId); resourceResolver.refresh(); VotingView winningVote = VotingHelper.getWinningVoting( resourceResolver, config); Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java Tue Oct 20 14:12:31 2015 @@ -25,8 +25,8 @@ import java.util.List; import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.discovery.commons.providers.util.ResourceHelper; import org.apache.sling.discovery.impl.Config; -import org.apache.sling.discovery.impl.common.resource.ResourceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java Tue Oct 20 14:12:31 2015 @@ -34,10 +34,10 @@ import org.apache.sling.api.resource.Per import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ValueMap; +import org.apache.sling.discovery.commons.providers.util.ResourceHelper; import org.apache.sling.discovery.impl.Config; import org.apache.sling.discovery.impl.common.View; import org.apache.sling.discovery.impl.common.ViewHelper; -import org.apache.sling.discovery.impl.common.resource.ResourceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java Tue Oct 20 14:12:31 2015 @@ -18,22 +18,15 @@ */ package org.apache.sling.discovery.impl.common.heartbeat; -import java.util.ArrayList; import java.util.Calendar; -import java.util.Collection; import java.util.Date; -import java.util.HashMap; import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; import javax.jcr.Session; -import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.ReferenceCardinality; import org.apache.felix.scr.annotations.ReferencePolicy; @@ -45,25 +38,20 @@ import org.apache.sling.api.resource.Res import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.commons.scheduler.Scheduler; +import org.apache.sling.discovery.base.commons.BaseViewChecker; +import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry; +import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry; +import org.apache.sling.discovery.commons.providers.util.ResourceHelper; import org.apache.sling.discovery.impl.Config; import org.apache.sling.discovery.impl.DiscoveryServiceImpl; import org.apache.sling.discovery.impl.cluster.voting.VotingHandler; import org.apache.sling.discovery.impl.cluster.voting.VotingHelper; import org.apache.sling.discovery.impl.cluster.voting.VotingView; import org.apache.sling.discovery.impl.common.ViewHelper; -import org.apache.sling.discovery.impl.common.resource.ResourceHelper; -import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry; -import org.apache.sling.discovery.impl.topology.connector.ConnectorRegistry; import org.apache.sling.launchpad.api.StartupListener; -import org.apache.sling.launchpad.api.StartupMode; import org.apache.sling.settings.SlingSettingsService; import org.osgi.framework.BundleException; -import org.osgi.framework.Constants; -import org.osgi.framework.ServiceReference; -import org.osgi.service.component.ComponentContext; import org.osgi.service.http.HttpService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The heartbeat handler is responsible and capable of issuing both local and @@ -77,51 +65,16 @@ import org.slf4j.LoggerFactory; @Reference(referenceInterface=HttpService.class, cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE, policy=ReferencePolicy.DYNAMIC) -public class HeartbeatHandler implements Runnable, StartupListener { +public class HeartbeatHandler extends BaseViewChecker { private static final String PROPERTY_ID_LAST_HEARTBEAT = "lastHeartbeat"; - private static final String PROPERTY_ID_ENDPOINTS = "endpoints"; - - private static final String PROPERTY_ID_SLING_HOME_PATH = "slingHomePath"; - - private static final String PROPERTY_ID_RUNTIME = "runtimeId"; - - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - - /** Endpoint service registration property from RFC 189 */ - private static final String REG_PROPERTY_ENDPOINTS = "osgi.http.service.endpoints"; - - /** the name used for the period job with the scheduler **/ - private String NAME = "discovery.impl.heartbeat.runner."; - - @Reference - private SlingSettingsService slingSettingsService; - - @Reference - private ResourceResolverFactory resourceResolverFactory; - - @Reference - private ConnectorRegistry connectorRegistry; - - @Reference - private AnnouncementRegistry announcementRegistry; - - @Reference - private Scheduler scheduler; - @Reference private Config config; @Reference private VotingHandler votingHandler; - /** the discovery service reference is used to get properties updated before heartbeats are sent **/ - private DiscoveryServiceImpl discoveryService; - - /** the sling id of the local instance **/ - private String slingId; - /** the id which is to be used for the next voting **/ private String nextVotingId = UUID.randomUUID().toString(); @@ -135,81 +88,50 @@ public class HeartbeatHandler implements */ private volatile String newLeaderElectionId; - /** lock object for synchronizing the run method **/ - private final Object lock = new Object(); - /** SLING-2892: remember first heartbeat written to repository by this instance **/ private long firstHeartbeatWritten = -1; /** SLING-2892: remember the value of the heartbeat this instance has written the last time **/ private Calendar lastHeartbeatWritten = null; - /** SLING-2895: avoid heartbeats after deactivation **/ - private volatile boolean activated = false; - - /** SLING-2901: the runtimeId is a unique id, set on activation, used for robust duplicate sling.id detection **/ - private String runtimeId; - - /** keep a reference to the component context **/ - private ComponentContext context; - - /** SLING-2968 : start issuing remote heartbeats only after startup finished **/ - private boolean startupFinished = false; - - /** SLING-3382 : force ping instructs the servlet to start the backoff from scratch again **/ - private boolean forcePing; + private DiscoveryServiceImpl discoveryServiceImpl; - /** SLING-4765 : store endpoints to /clusterInstances for more verbose duplicate slingId/ghost detection **/ - private final Map<Long, String[]> endpoints = new HashMap<Long, String[]>(); - - public void inform(StartupMode mode, boolean finished) { - if (finished) { - startupFinished(mode); - } - } - - public void startupFinished(StartupMode mode) { - synchronized(lock) { - startupFinished = true; - issueHeartbeat(); - } - } - - public void startupProgress(float ratio) { - // we dont care - } - - @Activate - protected void activate(ComponentContext context) { - synchronized(lock) { - this.context = context; - - slingId = slingSettingsService.getSlingId(); - NAME = "discovery.impl.heartbeat.runner." + slingId; - // on activate the resetLeaderElectionId is set to true to ensure that - // the 'leaderElectionId' property is reset on next heartbeat issuance. - // the idea being that a node which leaves the cluster should not - // become leader on next join - and by resetting the leaderElectionId - // to the current time, this is ensured. - resetLeaderElectionId = true; - runtimeId = UUID.randomUUID().toString(); - - // SLING-2895: reset variables to avoid unnecessary log.error - firstHeartbeatWritten = -1; - lastHeartbeatWritten = null; + /** for testing only **/ + public static HeartbeatHandler testConstructor( + SlingSettingsService slingSettingsService, + ResourceResolverFactory factory, + AnnouncementRegistry announcementRegistry, + ConnectorRegistry connectorRegistry, + Config config, + Scheduler scheduler) { + HeartbeatHandler handler = new HeartbeatHandler(); + handler.slingSettingsService = slingSettingsService; + handler.resourceResolverFactory = factory; + handler.announcementRegistry = announcementRegistry; + handler.connectorRegistry = connectorRegistry; + handler.connectorConfig = config; + handler.config = config; + handler.scheduler = scheduler; + return handler; + } + + @Override + protected void doActivate() { + // on activate the resetLeaderElectionId is set to true to ensure that + // the 'leaderElectionId' property is reset on next heartbeat issuance. + // the idea being that a node which leaves the cluster should not + // become leader on next join - and by resetting the leaderElectionId + // to the current time, this is ensured. + resetLeaderElectionId = true; + runtimeId = UUID.randomUUID().toString(); - activated = true; - logger.info("activate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId); - } - } + // SLING-2895: reset variables to avoid unnecessary log.error + firstHeartbeatWritten = -1; + lastHeartbeatWritten = null; - @Deactivate - protected void deactivate() { - // SLING-3365 : dont synchronize on deactivate - activated = false; - scheduler.removeJob(NAME); + logger.info("doActivate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId); } - + /** * The initialize method is called by the DiscoveryServiceImpl.activate * as we require the discoveryService (and the discoveryService has @@ -228,6 +150,7 @@ public class HeartbeatHandler implements final String initialVotingId) { synchronized(lock) { this.discoveryService = discoveryService; + this.discoveryServiceImpl = discoveryService; this.nextVotingId = initialVotingId; logger.info("initialize: nextVotingId="+nextVotingId); issueHeartbeat(); @@ -236,6 +159,9 @@ public class HeartbeatHandler implements try { final long interval = config.getHeartbeatInterval(); logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec."); + if (interval==0) { + logger.warn("initialize: Repeat interval cannot be zero."); + } scheduler.addPeriodicJob(NAME, this, null, interval, false); } catch (Exception e) { @@ -243,21 +169,6 @@ public class HeartbeatHandler implements } } - public void run() { - synchronized(lock) { - if (!activated) { - // SLING:2895: avoid heartbeats if not activated - return; - } - - // issue a heartbeat - issueHeartbeat(); - - // check the view - checkView(); - } - } - /** Get or create a ResourceResolver **/ private ResourceResolver getResourceResolver() throws LoginException { if (resourceResolverFactory == null) { @@ -272,21 +183,6 @@ public class HeartbeatHandler implements return config.getClusterInstancesPath() + "/" + slingId; } - /** Trigger the issuance of the next heartbeat asap instead of at next heartbeat interval **/ - public void triggerHeartbeat() { - forcePing = true; - try { - // then fire a job immediately - // use 'fireJobAt' here, instead of 'fireJob' to make sure the job can always be triggered - // 'fireJob' checks for a job from the same job-class to already exist - // 'fireJobAt' though allows to pass a name for the job - which can be made unique, thus does not conflict/already-exist - logger.info("triggerHeartbeat: firing job to trigger heartbeat"); - scheduler.fireJobAt(NAME+UUID.randomUUID(), this, null, new Date(System.currentTimeMillis()-1000 /* make sure it gets triggered immediately*/)); - } catch (Exception e) { - logger.info("triggerHeartbeat: Could not trigger heartbeat: " + e); - } - } - /** * Hook that will cause a reset of the leaderElectionId * on next invocation of issueClusterLocalHeartbeat. @@ -333,31 +229,14 @@ public class HeartbeatHandler implements * and then a remote heartbeat (to all the topology connectors * which announce this part of the topology to others) */ - void issueHeartbeat() { + protected void issueHeartbeat() { if (discoveryService == null) { logger.error("issueHeartbeat: discoveryService is null"); } else { discoveryService.updateProperties(); } issueClusterLocalHeartbeat(); - issueRemoteHeartbeats(); - } - - /** Issue a remote heartbeat using the topology connectors **/ - private void issueRemoteHeartbeats() { - if (connectorRegistry == null) { - logger.error("issueRemoteHeartbeats: connectorRegistry is null"); - return; - } - if (!startupFinished) { - logger.debug("issueRemoteHeartbeats: not issuing remote heartbeat yet, startup not yet finished"); - return; - } - if (logger.isDebugEnabled()) { - logger.debug("issueRemoteHeartbeats: pinging outgoing topology connectors (if there is any) for "+slingId); - } - connectorRegistry.pingOutgoingConnectors(forcePing); - forcePing = false; + issueConnectorPings(); } /** Issue a cluster local heartbeat (into the repository) **/ @@ -429,7 +308,7 @@ public class HeartbeatHandler implements " Check for sling.id.file in your installation of all instances in this cluster " + "to verify this! Duplicate sling.ids are not allowed within a cluster!"); logger.error("issueClusterLocalHeartbeat: sending TOPOLOGY_CHANGING before self-disabling."); - discoveryService.forcedShutdown(); + discoveryServiceImpl.forcedShutdown(); logger.error("issueClusterLocalHeartbeat: disabling discovery.impl"); activated = false; if (context!=null) { @@ -530,18 +409,13 @@ public class HeartbeatHandler implements /** Check whether the established view matches the reality, ie matches the * heartbeats */ - void checkView() { - // check the remotes first - if (announcementRegistry == null) { - logger.error("announcementRegistry is null"); - return; - } - announcementRegistry.checkExpiredAnnouncements(); + protected void doCheckView() { + super.doCheckView(); ResourceResolver resourceResolver = null; try { resourceResolver = getResourceResolver(); - doCheckView(resourceResolver); + doCheckViewWith(resourceResolver); } catch (LoginException e) { logger.error("checkView: could not log in administratively: " + e, e); @@ -558,16 +432,16 @@ public class HeartbeatHandler implements /** do the established-against-heartbeat view check using the given resourceResolver. */ - private void doCheckView(final ResourceResolver resourceResolver) throws PersistenceException { + private void doCheckViewWith(final ResourceResolver resourceResolver) throws PersistenceException { if (votingHandler==null) { - logger.info("doCheckView: votingHandler is null! slingId="+slingId); + logger.info("doCheckViewWith: votingHandler is null! slingId="+slingId); } else { votingHandler.analyzeVotings(resourceResolver); try{ votingHandler.cleanupTimedoutVotings(resourceResolver); } catch(Exception e) { - logger.warn("doCheckView: Exception occurred while cleaning up votings: "+e, e); + logger.warn("doCheckViewWith: Exception occurred while cleaning up votings: "+e, e); } } @@ -580,11 +454,11 @@ public class HeartbeatHandler implements // settle // but first: make sure we sent the TOPOLOGY_CHANGING - logger.info("doCheckView: there are pending votings, marking topology as changing..."); + logger.info("doCheckViewWith: there are pending votings, marking topology as changing..."); discoveryService.handleTopologyChanging(); if (logger.isDebugEnabled()) { - logger.debug("doCheckView: " + logger.debug("doCheckViewWith: " + numOpenNonWinningVotes + " ongoing votings, no one winning yet - I shall wait for them to settle."); } @@ -600,19 +474,19 @@ public class HeartbeatHandler implements // that's the normal case. the established view matches what we're // seeing. // all happy and fine - logger.debug("doCheckView: no pending nor winning votes. view is fine. we're all happy."); + logger.debug("doCheckViewWith: no pending nor winning votes. view is fine. we're all happy."); return; } // immediately send a TOPOLOGY_CHANGING - could already be sent, but just to be sure - logger.info("doCheckView: no matching established view, marking topology as changing"); + logger.info("doCheckViewWith: no matching established view, marking topology as changing"); discoveryService.handleTopologyChanging(); if (logger.isDebugEnabled()) { - logger.debug("doCheckView: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting"); + logger.debug("doCheckViewWith: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting"); Iterator<String> it = liveInstances.iterator(); while (it.hasNext()) { - logger.debug("doCheckView: one of the live instances is: " + logger.debug("doCheckViewWith: one of the live instances is: " + it.next()); } } @@ -663,89 +537,5 @@ public class HeartbeatHandler implements } } } - - /** - * Bind a http service - */ - protected void bindHttpService(final ServiceReference reference) { - final String[] endpointUrls = toStringArray(reference.getProperty(REG_PROPERTY_ENDPOINTS)); - if ( endpointUrls != null ) { - synchronized ( lock ) { - this.endpoints.put((Long)reference.getProperty(Constants.SERVICE_ID), endpointUrls); - - // make sure this gets written on next heartbeat - firstHeartbeatWritten = -1; - lastHeartbeatWritten = null; - } - } - } - - /** - * Unbind a http service - */ - protected void unbindHttpService(final ServiceReference reference) { - synchronized ( lock ) { - if ( this.endpoints.remove(reference.getProperty(Constants.SERVICE_ID)) != null ) { - // make sure the change gets written on next heartbeat - firstHeartbeatWritten = -1; - lastHeartbeatWritten = null; - } - } - } - - private String[] toStringArray(final Object propValue) { - if (propValue == null) { - // no value at all - return null; - - } else if (propValue instanceof String) { - // single string - return new String[] { (String) propValue }; - - } else if (propValue instanceof String[]) { - // String[] - return (String[]) propValue; - - } else if (propValue.getClass().isArray()) { - // other array - Object[] valueArray = (Object[]) propValue; - List<String> values = new ArrayList<String>(valueArray.length); - for (Object value : valueArray) { - if (value != null) { - values.add(value.toString()); - } - } - return values.toArray(new String[values.size()]); - - } else if (propValue instanceof Collection<?>) { - // collection - Collection<?> valueCollection = (Collection<?>) propValue; - List<String> valueList = new ArrayList<String>(valueCollection.size()); - for (Object value : valueCollection) { - if (value != null) { - valueList.add(value.toString()); - } - } - return valueList.toArray(new String[valueList.size()]); - } - - return null; - } - private String getEndpointsAsString() { - final StringBuilder sb = new StringBuilder(); - boolean first = true; - for(final String[] points : endpoints.values()) { - for(final String point : points) { - if ( first ) { - first = false; - } else { - sb.append(","); - } - sb.append(point); - } - } - return sb.toString(); - - } } Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java Tue Oct 20 14:12:31 2015 @@ -27,8 +27,8 @@ import java.util.List; import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ValueMap; import org.apache.sling.discovery.InstanceDescription; +import org.apache.sling.discovery.commons.providers.spi.LocalClusterView; import org.apache.sling.discovery.impl.Config; -import org.apache.sling.discovery.impl.common.DefaultClusterViewImpl; import org.apache.sling.discovery.impl.common.View; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; * as stored in the repository at the according location * */ -public class EstablishedClusterView extends DefaultClusterViewImpl { +public class EstablishedClusterView extends LocalClusterView { /** * use static logger to avoid frequent initialization as is potentially the @@ -50,7 +50,7 @@ public class EstablishedClusterView exte /** Construct a new established cluster view **/ public EstablishedClusterView(final Config config, final View view, final String localId) { - super(view.getViewId()); + super(view.getViewId(), null /* localClusterSyncTokenId not supported */); final Resource viewRes = view.getResource(); if (viewRes == null) { Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java?rev=1709601&r1=1709600&r2=1709601&view=diff ============================================================================== --- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java (original) +++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java Tue Oct 20 14:12:31 2015 @@ -24,17 +24,17 @@ import java.util.Map; import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ValueMap; -import org.apache.sling.discovery.impl.common.DefaultClusterViewImpl; -import org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl; +import org.apache.sling.discovery.commons.providers.DefaultClusterView; +import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription; /** * An InstanceDescription which reads the properties from the according location * in the repository */ public class EstablishedInstanceDescription extends - DefaultInstanceDescriptionImpl { + DefaultInstanceDescription { - public EstablishedInstanceDescription(final DefaultClusterViewImpl clusterView, + public EstablishedInstanceDescription(final DefaultClusterView clusterView, final Resource res, final String slingId, final boolean isLeader, final boolean isOwn) { super(clusterView, isLeader, isOwn, slingId, null);