This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push: new c46ce6b SLING-12214 - Use constants for topic names (#150) c46ce6b is described below commit c46ce6b76b1375431ced5c82fd71104d94c0f84b Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Mon Jun 24 17:33:41 2024 +0200 SLING-12214 - Use constants for topic names (#150) * SLING-12214 - Use constants for topic names * SLING-12214 - Remove unused imports --- .../journal/impl/discovery/DiscoveryService.java | 7 +- .../impl/precondition/PackageStatusWatcher.java | 6 +- .../impl/precondition/StagingPrecondition.java | 8 +- .../publisher/DistributedEventNotifierManager.java | 4 +- .../impl/publisher/DistributionPublisher.java | 4 +- .../impl/publisher/PackageDistributedNotifier.java | 21 ++--- .../impl/publisher/PubQueueProviderPublisher.java | 7 +- .../journal/impl/subscriber/CommandPoller.java | 4 +- .../impl/subscriber/DistributionSubscriber.java | 16 ++-- .../sling/distribution/journal/shared/Topics.java | 93 ++-------------------- .../impl/discovery/DiscoveryServiceTest.java | 9 +-- .../precondition/PackageStatusWatcherTest.java | 6 +- .../impl/precondition/StagingPreconditionTest.java | 5 -- .../DistributedEventNotifierManagerTest.java | 5 -- .../impl/publisher/DistributionPublisherTest.java | 7 +- .../impl/publisher/MessagingCacheCallbackTest.java | 5 -- .../publisher/PackageDistributedNotifierTest.java | 9 +-- .../journal/impl/subscriber/CommandPollerTest.java | 5 +- .../journal/impl/subscriber/SubscriberTest.java | 13 ++- .../journal/queue/impl/PubQueueTest.java | 2 +- 20 files changed, 42 insertions(+), 194 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java index b624d80..09d3f1b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java @@ -80,9 +80,6 @@ public class DiscoveryService implements Runnable { @Reference private MessagingProvider messagingProvider; - @Reference - private Topics topics; - @Reference(policyOption = ReferencePolicyOption.GREEDY, cardinality = ReferenceCardinality.OPTIONAL) private volatile TopologyChangeHandler topologyChangeHandler; //NOSONAR @@ -102,18 +99,16 @@ public class DiscoveryService implements Runnable { public DiscoveryService( MessagingProvider messagingProvider, TopologyChangeHandler topologyChangeHandler, - Topics topics, EventAdmin eventAdmin) { this.messagingProvider = messagingProvider; this.topologyChangeHandler = topologyChangeHandler; - this.topics = topics; this.eventAdmin = eventAdmin; } @Activate public void activate(BundleContext context) { poller = messagingProvider.createPoller( - topics.getDiscoveryTopic(), + Topics.DISCOVERY_TOPIC, Reset.latest, create(DiscoveryMessage.class, this::handleDiscovery), create(LogMessage.class, this::handleLog) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java index 936cb84..10d54fc 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java @@ -46,11 +46,9 @@ public class PackageStatusWatcher implements Closeable { private final Map<String, NavigableMap<Long, Status>> pkgStatusPerSubAgent = new ConcurrentHashMap<>(); - public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics) { - String topicName = topics.getStatusTopic(); - + public PackageStatusWatcher(MessagingProvider messagingProvider) { poller = messagingProvider.createPoller( - topicName, + Topics.STATUS_TOPIC, Reset.earliest, create(PackageStatusMessage.class, this::handle) ); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java index a1b9c74..77d3c09 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java @@ -24,7 +24,6 @@ import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PE import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; -import org.apache.sling.distribution.journal.shared.Topics; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; @@ -49,14 +48,11 @@ public class StagingPrecondition implements Precondition, Runnable { @Reference private MessagingProvider messagingProvider; - @Reference - private Topics topics; - private volatile PackageStatusWatcher watcher; @Activate public void activate() { - watcher = new PackageStatusWatcher(messagingProvider, topics); + watcher = new PackageStatusWatcher(messagingProvider); LOG.info("Activated Staging Precondition"); } @@ -81,7 +77,7 @@ public class StagingPrecondition implements Precondition, Runnable { public synchronized void run() { LOG.info("Purging StagingPrecondition cache"); IOUtils.closeQuietly(watcher); - watcher = new PackageStatusWatcher(messagingProvider, topics); + watcher = new PackageStatusWatcher(messagingProvider); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java index a62fa9a..34aead3 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java @@ -24,7 +24,6 @@ import org.apache.sling.discovery.TopologyEventListener; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler; import org.apache.sling.distribution.journal.queue.PubQueueProvider; -import org.apache.sling.distribution.journal.shared.Topics; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.annotations.Activate; @@ -76,13 +75,12 @@ public class DistributedEventNotifierManager implements TopologyEventListener, R @Reference EventAdmin eventAdmin, @Reference PubQueueProvider pubQueueCacheService, @Reference MessagingProvider messagingProvider, - @Reference Topics topics, @Reference ResourceResolverFactory resolverFactory, @Reference EventHandler distributedEventHandler ) { this.context = context; this.config = config; - this.notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, config.ensureEvent()); + this.notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, resolverFactory, config.ensureEvent()); if (! config.deduplicateEvent()) { registerService(); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index 8e362cd..7133ae4 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -122,8 +122,6 @@ public class DistributionPublisher implements DistributionAgent { @Reference EventAdmin eventAdmin, @Reference - Topics topics, - @Reference MetricsService metricsService, @Reference PubQueueProvider pubQueueProvider, @@ -151,7 +149,7 @@ public class DistributionPublisher implements DistributionAgent { maxQueueSizeDelay = config.maxQueueSizeDelay(); pkgType = packageBuilder.getType(); - this.sender = messagingProvider.createSender(topics.getPackageTopic()); + this.sender = messagingProvider.createSender(Topics.PACKAGE_TOPIC); publishMetrics.subscriberCount(() -> discoveryService.getSubscriberCount(pubAgentName)); distLog.info("Started Publisher agent={} with packageBuilder={}, limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}", diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java index 8f91e03..73881f6 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java @@ -18,7 +18,6 @@ */ package org.apache.sling.distribution.journal.impl.publisher; -import org.apache.commons.lang3.StringUtils; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.bookkeeper.LocalStore; @@ -66,30 +65,22 @@ public class PackageDistributedNotifier implements TopologyChangeHandler { private final MessagingProvider messagingProvider; - private final Topics topics; - private final ResourceResolverFactory resolverFactory; private Consumer<PackageDistributedMessage> sender; - private final boolean sendMsg; - private final boolean ensureEvent; - public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueCacheService, MessagingProvider messagingProvider, Topics topics, ResourceResolverFactory resolverFactory, boolean ensureEvent) { + public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider pubQueueCacheService, MessagingProvider messagingProvider, ResourceResolverFactory resolverFactory, boolean ensureEvent) { this.eventAdmin = eventAdmin; this.pubQueueCacheService = pubQueueCacheService; this.messagingProvider = messagingProvider; - this.topics = topics; this.resolverFactory = resolverFactory; this.ensureEvent = ensureEvent; - sendMsg = StringUtils.isNotBlank(topics.getEventTopic()); - if (sendMsg) { - sender = messagingProvider.createSender(topics.getEventTopic()); - } + sender = messagingProvider.createSender(Topics.EVENT_TOPIC); - LOG.info("Started package distributed notifier with event message topic {}", topics.getEventTopic()); + LOG.info("Started package distributed notifier"); } @Override @@ -122,7 +113,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler { } private LocalStore newLocalStore(String pubAgentName) { - String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic()); + String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), Topics.PACKAGE_TOPIC); return new LocalStore(resolverFactory, packageNodeName, pubAgentName); } @@ -146,9 +137,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler { protected void notifyDistributed(String pubAgentName, DistributionQueueItem queueItem) { LOG.debug("Sending distributed notifications for pubAgentName={}, pkgId={}", pubAgentName, queueItem.getPackageId()); sendEvt(pubAgentName, queueItem); - if (sendMsg) { - sendMsg(pubAgentName, queueItem); - } + sendMsg(pubAgentName, queueItem); } private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java index f5727f5..8452f4d 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java @@ -58,21 +58,20 @@ public class PubQueueProviderPublisher { public PubQueueProviderPublisher( @Reference MessagingProvider messagingProvider, @Reference DiscoveryService discoveryService, - @Reference Topics topics, @Reference MetricsService metricsService, @Reference PubQueueProviderFactory pubQueueProviderFactory, BundleContext context) { PublishMetrics publishMetrics = new PublishMetrics(metricsService, ""); - Consumer<ClearCommand> commandSender = messagingProvider.createSender(topics.getCommandTopic()); + Consumer<ClearCommand> commandSender = messagingProvider.createSender(Topics.COMMAND_TOPIC); CacheCallback callback = new MessagingCacheCallback( messagingProvider, - topics.getPackageTopic(), + Topics.PACKAGE_TOPIC, publishMetrics, discoveryService, commandSender); this.pubQueueProvider = pubQueueProviderFactory.create(callback); this.statusPoller = messagingProvider.createPoller( - topics.getStatusTopic(), + Topics.STATUS_TOPIC, Reset.earliest, HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus) ); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java index fcdb2b6..6422e8c 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java @@ -41,12 +41,12 @@ public class CommandPoller implements Closeable { private final AtomicLong clearOffset = new AtomicLong(-1); private final Runnable callback; - public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, Runnable callback) { + public CommandPoller(MessagingProvider messagingProvider, String subSlingId, String subAgentName, Runnable callback) { this.subSlingId = subSlingId; this.subAgentName = subAgentName; this.callback = callback; this.poller = messagingProvider.createPoller( - topics.getCommandTopic(), + Topics.COMMAND_TOPIC, Reset.earliest, create(ClearCommand.class, this::handleCommandMessage) ); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index a36cc56..9cc0d0d 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -113,9 +113,6 @@ public class DistributionSubscriber { @Reference private MessagingProvider messagingProvider; - @Reference - private Topics topics; - @Reference(name = "precondition") private Precondition precondition; @@ -169,13 +166,12 @@ public class DistributionSubscriber { requireNonNull(packageBuilder); requireNonNull(slingSettings); requireNonNull(messagingProvider); - requireNonNull(topics); requireNonNull(precondition); requireNonNull(bookKeeperFactory); this.subscriberMetrics = new SubscriberMetrics(metricsService, subAgentName, getFirst(config.agentNames()), config.editable()); if (config.editable()) { - commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, delay::signal); + commandPoller = new CommandPoller(messagingProvider, subSlingId, subAgentName, delay::signal); } if (config.subscriberIdleCheck()) { @@ -190,10 +186,10 @@ public class DistributionSubscriber { queueNames = getNotEmpty(config.agentNames()); pkgType = requireNonNull(packageBuilder.getType()); - Consumer<PackageStatusMessage> statusSender = messagingProvider.createSender(topics.getStatusTopic()); - Consumer<LogMessage> logSender = messagingProvider.createSender(topics.getDiscoveryTopic()); + Consumer<PackageStatusMessage> statusSender = messagingProvider.createSender(Topics.STATUS_TOPIC); + Consumer<LogMessage> logSender = messagingProvider.createSender(Topics.DISCOVERY_TOPIC); - String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic()); + String packageNodeName = escapeTopicName(messagingProvider.getServerUri(), Topics.PACKAGE_TOPIC); BookKeeperConfig bkConfig = new BookKeeperConfig( subAgentName, subSlingId, @@ -207,7 +203,7 @@ public class DistributionSubscriber { long startOffset = bookKeeper.loadOffset() + 1; String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null; - packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign, + packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, Reset.latest, assign, HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)); queueThread = startBackgroundThread(this::processQueue, @@ -215,7 +211,7 @@ public class DistributionSubscriber { int announceDelay = Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class); announcer = new Announcer(subSlingId, subAgentName, queueNames, - messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper, + messagingProvider.createSender(Topics.DISCOVERY_TOPIC), bookKeeper, config.maxRetries(), config.editable(), announceDelay); LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", subAgentName, startOffset, diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java index b605732..92ca75f 100644 --- a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java +++ b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java @@ -18,96 +18,15 @@ */ package org.apache.sling.distribution.journal.shared; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.metatype.annotations.AttributeDefinition; -import org.osgi.service.metatype.annotations.Designate; -import org.osgi.service.metatype.annotations.ObjectClassDefinition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Component(service = Topics.class, configurationPid = "org.apache.sling.distribution.journal.impl.shared.Topics") -@Designate(ocd = Topics.TopicsConfiguration.class) public class Topics { - private static final Logger LOG = LoggerFactory.getLogger(Topics.class); - - public static final String PACKAGE_TOPIC = "aemdistribution_package"; - public static final String DISCOVERY_TOPIC = "aemdistribution_discovery"; - public static final String STATUS_TOPIC = "aemdistribution_status"; - public static final String COMMAND_TOPIC = "aemdistribution_command"; - public static final String EVENT_TOPIC = "aemdistribution_event"; + public static final String PACKAGE_TOPIC = "package"; + public static final String DISCOVERY_TOPIC = "discovery"; + public static final String STATUS_TOPIC = "status"; + public static final String COMMAND_TOPIC = "command"; + public static final String EVENT_TOPIC = "event"; - private String discoveryTopic; - private String packageTopic; - private String statusTopic; - private String commandTopic; - private String eventTopic; - - public Topics() { - packageTopic = PACKAGE_TOPIC; - discoveryTopic = DISCOVERY_TOPIC; - statusTopic = STATUS_TOPIC; - commandTopic = COMMAND_TOPIC; - eventTopic = EVENT_TOPIC; + private Topics() { } - - @Activate - public void activate(TopicsConfiguration config) { - this.packageTopic = config.packageTopic(); - this.discoveryTopic = config.discoveryTopic(); - this.statusTopic = config.statusTopic(); - this.commandTopic = config.commandTopic(); - this.eventTopic = config.eventTopic(); - LOG.info("Topics service started with packageTopic '{}' discoveryTopic '{}' statusTopic '{}' eventTopic '{}' commandTopic '{}'", - packageTopic, discoveryTopic, statusTopic, eventTopic, commandTopic); - } - - public String getPackageTopic() { - return packageTopic; - } - - public String getDiscoveryTopic() { - return discoveryTopic; - } - - public String getStatusTopic() { - return statusTopic; - } - - public String getCommandTopic() { - return commandTopic; - } - - public String getEventTopic() { - return eventTopic; - } - - - @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Topics") - public @interface TopicsConfiguration { - - @AttributeDefinition(name = "Packages Topic", - description = "The topic for package messages.") - String packageTopic() default PACKAGE_TOPIC; - - @AttributeDefinition(name = "Discovery Topic", - description = "The topic for discovery messages.") - String discoveryTopic() default DISCOVERY_TOPIC; - - @AttributeDefinition(name = "Status Topic", - description = "The topic for status messages.") - String statusTopic() default STATUS_TOPIC; - - @AttributeDefinition(name = "Command Topic", - description = "The topic for command messages.") - String commandTopic() default COMMAND_TOPIC; - - @AttributeDefinition(name = "Event Topic", - description = "The optional topic for event messages. If the topic is blank, no event message is sent.") - String eventTopic() default EVENT_TOPIC; - - } - } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java index 6903df1..9adc9ff 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java @@ -39,7 +39,6 @@ import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.SubscriberConfig; import org.apache.sling.distribution.journal.messages.SubscriberState; import org.apache.sling.distribution.journal.shared.TestMessageInfo; -import org.apache.sling.distribution.journal.shared.Topics; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -48,7 +47,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import org.osgi.framework.BundleContext; import org.osgi.service.event.Event; @@ -79,9 +77,6 @@ public class DiscoveryServiceTest { @Captor ArgumentCaptor<Event> captureEvent; - @Spy - Topics topics = new Topics(); - @Mock TopologyChangeHandler topologyChangeHandler; @@ -95,9 +90,7 @@ public class DiscoveryServiceTest { @Before public void before() { - discoveryService = new DiscoveryService( - clientProvider, topologyChangeHandler, - topics, eventAdmin); + discoveryService = new DiscoveryService(clientProvider, topologyChangeHandler, eventAdmin); when(clientProvider.createPoller( Mockito.anyString(), Mockito.any(Reset.class), diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java index 24a37ef..aca4263 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java @@ -39,7 +39,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.Spy; public class PackageStatusWatcherTest { @@ -54,9 +53,6 @@ public class PackageStatusWatcherTest { @Mock MessagingProvider provider; - @Spy - Topics topics = new Topics(); - @Captor private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> adapterCaptor; @@ -71,7 +67,7 @@ public class PackageStatusWatcherTest { adapterCaptor.capture())) .thenReturn(mock(Closeable.class)); - statusWatcher = new PackageStatusWatcher(provider, topics); + statusWatcher = new PackageStatusWatcher(provider); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java index 826dd82..37c6448 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java @@ -33,7 +33,6 @@ import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.shared.TestMessageInfo; -import org.apache.sling.distribution.journal.shared.Topics; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.Before; @@ -44,7 +43,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.mockito.Spy; public class StagingPreconditionTest { @@ -57,9 +55,6 @@ public class StagingPreconditionTest { @Mock private MessagingProvider clientProvider; - @Spy - private Topics topics = new Topics(); - @Captor private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statusCaptor; diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java index 7c426d6..9f73cc6 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java @@ -26,7 +26,6 @@ import org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl; import org.apache.sling.discovery.impl.topology.TopologyViewImpl; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.queue.PubQueueProvider; -import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.testing.mock.osgi.MockOsgi; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.junit.Before; @@ -59,9 +58,6 @@ public class DistributedEventNotifierManagerTest { @Mock private EventHandler distributedEventHandler; - @Spy - private Topics topics; - @Spy private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); @@ -127,7 +123,6 @@ public class DistributedEventNotifierManagerTest { eventAdmin, pubQueueCacheService, messagingProvider, - topics, resolverFactory, distributedEventHandler ); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 206cb29..fe61a07 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -62,7 +62,6 @@ import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.queue.PubQueueProvider; import org.apache.sling.distribution.journal.queue.impl.OffsetQueueImpl; import org.apache.sling.distribution.journal.queue.impl.PubQueue; -import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.testing.mock.osgi.junit.OsgiContext; @@ -74,7 +73,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import org.osgi.framework.BundleContext; import org.osgi.service.condition.Condition; @@ -123,9 +121,6 @@ public class DistributionPublisherTest { @Captor private ArgumentCaptor<PackageMessage> pkgCaptor; - @Spy - private Topics topics = new Topics(); - private MetricsService metricsService; @Before @@ -139,7 +134,7 @@ public class DistributionPublisherTest { BundleContext bcontext = context.bundleContext(); when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender); publisher = new DistributionPublisher(messagingProvider, packageBuilder, discoveryService, factory, - eventAdmin, topics, metricsService, pubQueueProvider, Condition.INSTANCE, config, bcontext); + eventAdmin, metricsService, pubQueueProvider, Condition.INSTANCE, config, bcontext); when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java index c30d56b..5a7165b 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java @@ -50,7 +50,6 @@ import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.queue.QueueState; import org.apache.sling.distribution.journal.shared.TestMessageInfo; -import org.apache.sling.distribution.journal.shared.Topics; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -59,7 +58,6 @@ import org.mockito.Captor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -79,9 +77,6 @@ public class MessagingCacheCallbackTest { @Mock private MessagingProvider messagingProvider; - @Spy - private Topics topics; - @Mock private JournalAvailable journalAvailable; diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java index f4bf3c8..cc372db 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java @@ -61,9 +61,6 @@ public class PackageDistributedNotifierTest { @Mock private MessagingProvider messagingProvider; - @Spy - private Topics topics; - @Spy private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); @@ -108,7 +105,7 @@ public class PackageDistributedNotifierTest { .thenReturn(statPoller); URI serverURI = new URI("http://myserver.apache.org:1234/somepath"); when(messagingProvider.getServerUri()).thenReturn(serverURI); - when(messagingProvider.createSender(Mockito.eq(topics.getEventTopic()))) + when(messagingProvider.createSender(Topics.EVENT_TOPIC)) .thenReturn(sender); QueueErrors queueErrors = mock(QueueErrors.class); @@ -117,7 +114,7 @@ public class PackageDistributedNotifierTest { for(int i = 0; i <= 20; i++) handler.handle(info(i), packageMessage("packageid" + i, PUB_AGENT_NAME)); - notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, true); + notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, resolverFactory, true); } @Test @@ -155,7 +152,7 @@ public class PackageDistributedNotifierTest { notifier.storeLastDistributedOffset(); - notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, topics, resolverFactory, false); + notifier = new PackageDistributedNotifier(eventAdmin, pubQueueCacheService, messagingProvider, resolverFactory, false); // the last raised offset persisted in the author repository is not considered because `ensureEvent` is disabled when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 16)) .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 16)); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java index 1bfb978..4e28237 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java @@ -34,7 +34,6 @@ import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.messages.ClearCommand; import org.apache.sling.distribution.journal.shared.TestMessageInfo; -import org.apache.sling.distribution.journal.shared.Topics; import org.awaitility.Awaitility; import org.awaitility.Duration; import org.junit.Before; @@ -71,8 +70,6 @@ public class CommandPollerTest { private MessageHandler<ClearCommand> commandHandler; - private Topics topics = new Topics(); - private MessageInfo info; @Before @@ -157,7 +154,7 @@ public class CommandPollerTest { Mockito.eq(Reset.earliest), handlerCaptor.capture())) .thenReturn(poller); - commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME, callback); + commandPoller = new CommandPoller(clientProvider, SUB_SLING_ID, SUB_AGENT_NAME, callback); commandHandler = handlerCaptor.getValue().getHandler(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 56ab9fc..47432a3 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -121,7 +121,7 @@ public class SubscriberTest { private static final String PUB1_SLING_ID = "pub1sling"; private static final String PUB1_AGENT_NAME = "pub1agent"; - private static final String STORE_PACKAGE_NODE_NAME = "myserver.apache.org_somepath_aemdistribution_package"; + private static final String STORE_PACKAGE_NODE_NAME = "myserver.apache.org_somepath_package"; private static final PackageMessage BASIC_ADD_PACKAGE = PackageMessage.builder() .pkgId("myid") @@ -162,9 +162,6 @@ public class SubscriberTest { @Mock MessagingProvider clientProvider; - @Spy - Topics topics = new Topics(); - @Mock EventAdmin eventAdmin; @@ -232,11 +229,11 @@ public class SubscriberTest { URI serverURI = new URI("http://myserver.apache.org:1234/somepath"); when(clientProvider.getServerUri()).thenReturn(serverURI); - when(clientProvider.<PackageStatusMessage>createSender(topics.getStatusTopic())).thenReturn(statusSender); - when(clientProvider.<DiscoveryMessage>createSender(topics.getDiscoveryTopic())).thenReturn(discoverySender); + when(clientProvider.<PackageStatusMessage>createSender(Topics.STATUS_TOPIC)).thenReturn(statusSender); + when(clientProvider.<DiscoveryMessage>createSender(Topics.DISCOVERY_TOPIC)).thenReturn(discoverySender); when(clientProvider.createPoller( - Mockito.eq(topics.getCommandTopic()), + Mockito.eq(Topics.COMMAND_TOPIC), Mockito.eq(Reset.earliest), commandCaptor.capture())) .thenReturn(commandPoller); @@ -487,7 +484,7 @@ public class SubscriberTest { subscriber.bookKeeperFactory = bookKeeperFactory; subscriber.activate(config, context, props); verify(clientProvider).createPoller( - Mockito.eq(topics.getPackageTopic()), + Mockito.eq(Topics.PACKAGE_TOPIC), Mockito.eq(Reset.latest), Mockito.isNull(String.class), packageCaptor.capture(), diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java index 820bfff..379b494 100644 --- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java @@ -213,7 +213,7 @@ public class PubQueueTest { } private static String packageId(int nr) { - return PACKAGE_ID_PREFIX + new Integer(nr).toString(); + return PACKAGE_ID_PREFIX + Integer.valueOf(nr).toString(); } private Stream<DistributionQueueEntry> streamOf(Iterable<DistributionQueueEntry> entries) {