This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new c46ce6b  SLING-12214 - Use constants for topic names (#150)
c46ce6b is described below

commit c46ce6b76b1375431ced5c82fd71104d94c0f84b
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Mon Jun 24 17:33:41 2024 +0200

    SLING-12214 - Use constants for topic names (#150)
    
    * SLING-12214 - Use constants for topic names
    
    * SLING-12214 - Remove unused imports
---
 .../journal/impl/discovery/DiscoveryService.java   |  7 +-
 .../impl/precondition/PackageStatusWatcher.java    |  6 +-
 .../impl/precondition/StagingPrecondition.java     |  8 +-
 .../publisher/DistributedEventNotifierManager.java |  4 +-
 .../impl/publisher/DistributionPublisher.java      |  4 +-
 .../impl/publisher/PackageDistributedNotifier.java | 21 ++---
 .../impl/publisher/PubQueueProviderPublisher.java  |  7 +-
 .../journal/impl/subscriber/CommandPoller.java     |  4 +-
 .../impl/subscriber/DistributionSubscriber.java    | 16 ++--
 .../sling/distribution/journal/shared/Topics.java  | 93 ++--------------------
 .../impl/discovery/DiscoveryServiceTest.java       |  9 +--
 .../precondition/PackageStatusWatcherTest.java     |  6 +-
 .../impl/precondition/StagingPreconditionTest.java |  5 --
 .../DistributedEventNotifierManagerTest.java       |  5 --
 .../impl/publisher/DistributionPublisherTest.java  |  7 +-
 .../impl/publisher/MessagingCacheCallbackTest.java |  5 --
 .../publisher/PackageDistributedNotifierTest.java  |  9 +--
 .../journal/impl/subscriber/CommandPollerTest.java |  5 +-
 .../journal/impl/subscriber/SubscriberTest.java    | 13 ++-
 .../journal/queue/impl/PubQueueTest.java           |  2 +-
 20 files changed, 42 insertions(+), 194 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index b624d80..09d3f1b 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -80,9 +80,6 @@ public class DiscoveryService implements Runnable {
     @Reference
     private MessagingProvider messagingProvider;
 
-    @Reference
-    private Topics topics;
-
     @Reference(policyOption = ReferencePolicyOption.GREEDY, cardinality = 
ReferenceCardinality.OPTIONAL)
     private volatile TopologyChangeHandler topologyChangeHandler; //NOSONAR
 
@@ -102,18 +99,16 @@ public class DiscoveryService implements Runnable {
     public DiscoveryService(
             MessagingProvider messagingProvider,
             TopologyChangeHandler topologyChangeHandler,
-            Topics topics,
             EventAdmin eventAdmin) {
         this.messagingProvider = messagingProvider;
         this.topologyChangeHandler = topologyChangeHandler;
-        this.topics = topics;
         this.eventAdmin = eventAdmin;
     }
 
     @Activate
     public void activate(BundleContext context) {
         poller = messagingProvider.createPoller(
-                topics.getDiscoveryTopic(), 
+                Topics.DISCOVERY_TOPIC, 
                 Reset.latest,
                 create(DiscoveryMessage.class, this::handleDiscovery),
                 create(LogMessage.class, this::handleLog)
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 936cb84..10d54fc 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -46,11 +46,9 @@ public class PackageStatusWatcher implements Closeable {
     private final Map<String, NavigableMap<Long, Status>> pkgStatusPerSubAgent 
= new ConcurrentHashMap<>();
 
 
-    public PackageStatusWatcher(MessagingProvider messagingProvider, Topics 
topics) {
-        String topicName = topics.getStatusTopic();
-
+    public PackageStatusWatcher(MessagingProvider messagingProvider) {
         poller = messagingProvider.createPoller(
-                topicName,
+                Topics.STATUS_TOPIC,
                 Reset.earliest,
                 create(PackageStatusMessage.class, this::handle)
         );
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index a1b9c74..77d3c09 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -24,7 +24,6 @@ import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PE
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -49,14 +48,11 @@ public class StagingPrecondition implements Precondition, 
Runnable {
     @Reference
     private MessagingProvider messagingProvider;
 
-    @Reference
-    private Topics topics;
-
     private volatile PackageStatusWatcher watcher;
 
     @Activate
     public void activate() {
-        watcher = new PackageStatusWatcher(messagingProvider, topics);
+        watcher = new PackageStatusWatcher(messagingProvider);
         LOG.info("Activated Staging Precondition");
     }
 
@@ -81,7 +77,7 @@ public class StagingPrecondition implements Precondition, 
Runnable {
     public synchronized void run() {
         LOG.info("Purging StagingPrecondition cache");
         IOUtils.closeQuietly(watcher);
-        watcher = new PackageStatusWatcher(messagingProvider, topics);
+        watcher = new PackageStatusWatcher(messagingProvider);
     }
 
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
index a62fa9a..34aead3 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManager.java
@@ -24,7 +24,6 @@ import org.apache.sling.discovery.TopologyEventListener;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import 
org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
@@ -76,13 +75,12 @@ public class DistributedEventNotifierManager implements 
TopologyEventListener, R
             @Reference EventAdmin eventAdmin,
             @Reference PubQueueProvider pubQueueCacheService,
             @Reference MessagingProvider messagingProvider,
-            @Reference Topics topics,
             @Reference ResourceResolverFactory resolverFactory,
             @Reference EventHandler distributedEventHandler
     ) {
         this.context = context;
         this.config = config;
-        this.notifier = new PackageDistributedNotifier(eventAdmin, 
pubQueueCacheService, messagingProvider, topics, resolverFactory, 
config.ensureEvent());
+        this.notifier = new PackageDistributedNotifier(eventAdmin, 
pubQueueCacheService, messagingProvider, resolverFactory, config.ensureEvent());
         if (! config.deduplicateEvent()) {
             registerService();
         }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 8e362cd..7133ae4 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -122,8 +122,6 @@ public class DistributionPublisher implements 
DistributionAgent {
             @Reference
             EventAdmin eventAdmin,
             @Reference
-            Topics topics,
-            @Reference
             MetricsService metricsService,
             @Reference
             PubQueueProvider pubQueueProvider,
@@ -151,7 +149,7 @@ public class DistributionPublisher implements 
DistributionAgent {
         maxQueueSizeDelay = config.maxQueueSizeDelay();
         pkgType = packageBuilder.getType();
 
-        this.sender = messagingProvider.createSender(topics.getPackageTopic());
+        this.sender = messagingProvider.createSender(Topics.PACKAGE_TOPIC);
         publishMetrics.subscriberCount(() -> 
discoveryService.getSubscriberCount(pubAgentName));
         
         distLog.info("Started Publisher agent={} with packageBuilder={}, 
limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index 8f91e03..73881f6 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
@@ -66,30 +65,22 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
 
     private final MessagingProvider messagingProvider;
 
-    private final Topics topics;
-
     private final ResourceResolverFactory resolverFactory;
 
     private Consumer<PackageDistributedMessage> sender;
 
-    private final boolean sendMsg;
-
     private final boolean ensureEvent;
 
-    public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider 
pubQueueCacheService, MessagingProvider messagingProvider, Topics topics, 
ResourceResolverFactory resolverFactory, boolean ensureEvent) {
+    public PackageDistributedNotifier(EventAdmin eventAdmin, PubQueueProvider 
pubQueueCacheService, MessagingProvider messagingProvider, 
ResourceResolverFactory resolverFactory, boolean ensureEvent) {
         this.eventAdmin = eventAdmin;
         this.pubQueueCacheService = pubQueueCacheService;
         this.messagingProvider = messagingProvider;
-        this.topics = topics;
         this.resolverFactory = resolverFactory;
         this.ensureEvent = ensureEvent;
 
-        sendMsg = StringUtils.isNotBlank(topics.getEventTopic());
-        if (sendMsg) {
-            sender = messagingProvider.createSender(topics.getEventTopic());
-        }
+        sender = messagingProvider.createSender(Topics.EVENT_TOPIC);
 
-        LOG.info("Started package distributed notifier with event message 
topic {}", topics.getEventTopic());
+        LOG.info("Started package distributed notifier");
     }
 
     @Override
@@ -122,7 +113,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
     }
 
     private LocalStore newLocalStore(String pubAgentName) {
-        String packageNodeName = 
escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
+        String packageNodeName = 
escapeTopicName(messagingProvider.getServerUri(), Topics.PACKAGE_TOPIC);
         return new LocalStore(resolverFactory, packageNodeName, pubAgentName);
     }
 
@@ -146,9 +137,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
     protected void notifyDistributed(String pubAgentName, 
DistributionQueueItem queueItem) {
         LOG.debug("Sending distributed notifications for pubAgentName={}, 
pkgId={}", pubAgentName, queueItem.getPackageId());
         sendEvt(pubAgentName, queueItem);
-        if (sendMsg) {
-            sendMsg(pubAgentName, queueItem);
-        }
+        sendMsg(pubAgentName, queueItem);
     }
 
     private void sendMsg(String pubAgentName, DistributionQueueItem queueItem) 
{
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
index f5727f5..8452f4d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
@@ -58,21 +58,20 @@ public class PubQueueProviderPublisher {
     public PubQueueProviderPublisher(
             @Reference MessagingProvider messagingProvider,
             @Reference DiscoveryService discoveryService,
-            @Reference Topics topics,
             @Reference MetricsService metricsService,
             @Reference PubQueueProviderFactory pubQueueProviderFactory,
             BundleContext context) {
         PublishMetrics publishMetrics = new PublishMetrics(metricsService, "");
-        Consumer<ClearCommand> commandSender = 
messagingProvider.createSender(topics.getCommandTopic());
+        Consumer<ClearCommand> commandSender = 
messagingProvider.createSender(Topics.COMMAND_TOPIC);
         CacheCallback callback = new MessagingCacheCallback(
                 messagingProvider, 
-                topics.getPackageTopic(), 
+                Topics.PACKAGE_TOPIC, 
                 publishMetrics,
                 discoveryService,
                 commandSender);
         this.pubQueueProvider = pubQueueProviderFactory.create(callback);
         this.statusPoller = messagingProvider.createPoller(
-                topics.getStatusTopic(),
+                Topics.STATUS_TOPIC,
                 Reset.earliest,
                 HandlerAdapter.create(PackageStatusMessage.class, 
pubQueueProvider::handleStatus)
                 );
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index fcdb2b6..6422e8c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -41,12 +41,12 @@ public class CommandPoller implements Closeable {
     private final AtomicLong clearOffset = new AtomicLong(-1);
     private final Runnable callback;
 
-    public CommandPoller(MessagingProvider messagingProvider, Topics topics, 
String subSlingId, String subAgentName, Runnable callback) {
+    public CommandPoller(MessagingProvider messagingProvider, String 
subSlingId, String subAgentName, Runnable callback) {
         this.subSlingId = subSlingId;
         this.subAgentName = subAgentName;
         this.callback = callback;
         this.poller = messagingProvider.createPoller(
-                    topics.getCommandTopic(),
+                    Topics.COMMAND_TOPIC,
                     Reset.earliest,
                     create(ClearCommand.class, this::handleCommandMessage)
                     );
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index a36cc56..9cc0d0d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -113,9 +113,6 @@ public class DistributionSubscriber {
     @Reference
     private MessagingProvider messagingProvider;
 
-    @Reference
-    private Topics topics;
-
     @Reference(name = "precondition")
     private Precondition precondition;
 
@@ -169,13 +166,12 @@ public class DistributionSubscriber {
         requireNonNull(packageBuilder);
         requireNonNull(slingSettings);
         requireNonNull(messagingProvider);
-        requireNonNull(topics);
         requireNonNull(precondition);
         requireNonNull(bookKeeperFactory);
         this.subscriberMetrics = new SubscriberMetrics(metricsService, 
subAgentName, getFirst(config.agentNames()), config.editable());
 
         if (config.editable()) {
-            commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, delay::signal);
+            commandPoller = new CommandPoller(messagingProvider, subSlingId, 
subAgentName, delay::signal);
         }
 
         if (config.subscriberIdleCheck()) {
@@ -190,10 +186,10 @@ public class DistributionSubscriber {
         queueNames = getNotEmpty(config.agentNames());
         pkgType = requireNonNull(packageBuilder.getType());
 
-        Consumer<PackageStatusMessage> statusSender = 
messagingProvider.createSender(topics.getStatusTopic());
-        Consumer<LogMessage> logSender = 
messagingProvider.createSender(topics.getDiscoveryTopic());
+        Consumer<PackageStatusMessage> statusSender = 
messagingProvider.createSender(Topics.STATUS_TOPIC);
+        Consumer<LogMessage> logSender = 
messagingProvider.createSender(Topics.DISCOVERY_TOPIC);
 
-        String packageNodeName = 
escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
+        String packageNodeName = 
escapeTopicName(messagingProvider.getServerUri(), Topics.PACKAGE_TOPIC);
         BookKeeperConfig bkConfig = new BookKeeperConfig(
                 subAgentName,
                 subSlingId,
@@ -207,7 +203,7 @@ public class DistributionSubscriber {
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = startOffset > 0 ? 
messagingProvider.assignTo(startOffset) : null;
 
-        packagePoller = 
messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
+        packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, 
Reset.latest, assign,
                 HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, 
this::handleOffsetMessage));
 
         queueThread = startBackgroundThread(this::processQueue,
@@ -215,7 +211,7 @@ public class DistributionSubscriber {
 
         int announceDelay = 
Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class);
         announcer = new Announcer(subSlingId, subAgentName, queueNames,
-                messagingProvider.createSender(topics.getDiscoveryTopic()), 
bookKeeper,
+                messagingProvider.createSender(Topics.DISCOVERY_TOPIC), 
bookKeeper,
                 config.maxRetries(), config.editable(), announceDelay);
 
         LOG.info("Started Subscriber agent={} at offset={}, subscribed to 
agent names {}, readyCheck={}", subAgentName, startOffset,
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java 
b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
index b605732..92ca75f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Topics.java
@@ -18,96 +18,15 @@
  */
 package org.apache.sling.distribution.journal.shared;
 
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.metatype.annotations.AttributeDefinition;
-import org.osgi.service.metatype.annotations.Designate;
-import org.osgi.service.metatype.annotations.ObjectClassDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(service = Topics.class, configurationPid = 
"org.apache.sling.distribution.journal.impl.shared.Topics")
-@Designate(ocd = Topics.TopicsConfiguration.class)
 public class Topics {
 
-    private static final Logger LOG = LoggerFactory.getLogger(Topics.class);
-
-    public static final String PACKAGE_TOPIC = "aemdistribution_package";
-    public static final String DISCOVERY_TOPIC = "aemdistribution_discovery";
-    public static final String STATUS_TOPIC = "aemdistribution_status";
-    public static final String COMMAND_TOPIC = "aemdistribution_command";
-    public static final String EVENT_TOPIC = "aemdistribution_event";
+    public static final String PACKAGE_TOPIC = "package";
+    public static final String DISCOVERY_TOPIC = "discovery";
+    public static final String STATUS_TOPIC = "status";
+    public static final String COMMAND_TOPIC = "command";
+    public static final String EVENT_TOPIC = "event";
     
-    private String discoveryTopic;
-    private String packageTopic;
-    private String statusTopic;
-    private String commandTopic;
-    private String eventTopic;
-    
-    public Topics() {
-        packageTopic = PACKAGE_TOPIC;
-        discoveryTopic = DISCOVERY_TOPIC;
-        statusTopic = STATUS_TOPIC;
-        commandTopic = COMMAND_TOPIC;
-        eventTopic = EVENT_TOPIC;
+    private Topics() {
     }
-
     
-    @Activate
-    public void activate(TopicsConfiguration config) {
-        this.packageTopic = config.packageTopic();
-        this.discoveryTopic = config.discoveryTopic();
-        this.statusTopic = config.statusTopic();
-        this.commandTopic = config.commandTopic();
-        this.eventTopic = config.eventTopic();
-        LOG.info("Topics service started with packageTopic '{}' discoveryTopic 
'{}' statusTopic '{}' eventTopic '{}' commandTopic '{}'",
-                packageTopic, discoveryTopic, statusTopic, eventTopic, 
commandTopic);
-    }
-    
-    public String getPackageTopic() {
-        return packageTopic;
-    }
-    
-    public String getDiscoveryTopic() {
-        return discoveryTopic;
-    }
-
-    public String getStatusTopic() {
-        return statusTopic;
-    }
-
-    public String getCommandTopic() {
-        return commandTopic;
-    }
-
-    public String getEventTopic() {
-        return eventTopic;
-    }
-
-    
-    @ObjectClassDefinition(name = "Apache Sling Journal based Distribution - 
Topics")
-    public @interface TopicsConfiguration {
-
-        @AttributeDefinition(name = "Packages Topic",
-                description = "The topic for package messages.")
-        String packageTopic() default PACKAGE_TOPIC;
-
-        @AttributeDefinition(name = "Discovery Topic",
-                description = "The topic for discovery messages.")
-        String discoveryTopic() default DISCOVERY_TOPIC;
-
-        @AttributeDefinition(name = "Status Topic",
-                description = "The topic for status messages.")
-        String statusTopic() default STATUS_TOPIC;
-
-        @AttributeDefinition(name = "Command Topic",
-                description = "The topic for command messages.")
-        String commandTopic() default COMMAND_TOPIC;
-
-        @AttributeDefinition(name = "Event Topic",
-                description = "The optional topic for event messages. If the 
topic is blank, no event message is sent.")
-        String eventTopic() default EVENT_TOPIC;
-
-    }
-
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
index 6903df1..9adc9ff 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryServiceTest.java
@@ -39,7 +39,6 @@ import 
org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.SubscriberConfig;
 import org.apache.sling.distribution.journal.messages.SubscriberState;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,7 +47,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.event.Event;
@@ -79,9 +77,6 @@ public class DiscoveryServiceTest {
     @Captor
     ArgumentCaptor<Event> captureEvent;
     
-    @Spy
-    Topics topics = new Topics();
-
     @Mock
     TopologyChangeHandler topologyChangeHandler;
 
@@ -95,9 +90,7 @@ public class DiscoveryServiceTest {
     
     @Before
     public void before() {
-        discoveryService = new DiscoveryService(
-                clientProvider, topologyChangeHandler, 
-                topics, eventAdmin);
+        discoveryService = new DiscoveryService(clientProvider, 
topologyChangeHandler, eventAdmin);
         when(clientProvider.createPoller(
                 Mockito.anyString(), 
                 Mockito.any(Reset.class),
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 24a37ef..aca4263 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -39,7 +39,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
-import org.mockito.Spy;
 
 public class PackageStatusWatcherTest {
 
@@ -54,9 +53,6 @@ public class PackageStatusWatcherTest {
     @Mock
     MessagingProvider provider;
 
-    @Spy
-    Topics topics = new Topics();
-
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> adapterCaptor;
 
@@ -71,7 +67,7 @@ public class PackageStatusWatcherTest {
                 adapterCaptor.capture()))
                 .thenReturn(mock(Closeable.class));
 
-        statusWatcher = new PackageStatusWatcher(provider, topics);
+        statusWatcher = new PackageStatusWatcher(provider);
 
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index 826dd82..37c6448 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -33,7 +33,6 @@ import org.apache.sling.distribution.journal.Reset;
 import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
@@ -44,7 +43,6 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
-import org.mockito.Spy;
 
 public class StagingPreconditionTest {
 
@@ -57,9 +55,6 @@ public class StagingPreconditionTest {
     @Mock
     private MessagingProvider clientProvider;
 
-    @Spy
-    private Topics topics = new Topics();
-
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statusCaptor;
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
index 7c426d6..9f73cc6 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributedEventNotifierManagerTest.java
@@ -26,7 +26,6 @@ import 
org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl;
 import org.apache.sling.discovery.impl.topology.TopologyViewImpl;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
@@ -59,9 +58,6 @@ public class DistributedEventNotifierManagerTest {
     @Mock
     private EventHandler distributedEventHandler;
 
-    @Spy
-    private Topics topics;
-
     @Spy
     private ResourceResolverFactory resolverFactory = new 
MockResourceResolverFactory();
 
@@ -127,7 +123,6 @@ public class DistributedEventNotifierManagerTest {
                 eventAdmin,
                 pubQueueCacheService,
                 messagingProvider,
-                topics,
                 resolverFactory,
                 distributedEventHandler
         );
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 206cb29..fe61a07 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -62,7 +62,6 @@ import 
org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.queue.impl.OffsetQueueImpl;
 import org.apache.sling.distribution.journal.queue.impl.PubQueue;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
@@ -74,7 +73,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.condition.Condition;
@@ -123,9 +121,6 @@ public class DistributionPublisherTest {
     @Captor
     private ArgumentCaptor<PackageMessage> pkgCaptor;
 
-    @Spy
-    private Topics topics = new Topics();
-    
     private MetricsService metricsService;
 
     @Before
@@ -139,7 +134,7 @@ public class DistributionPublisherTest {
         BundleContext bcontext = context.bundleContext();
         
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher = new DistributionPublisher(messagingProvider, 
packageBuilder, discoveryService, factory,
-                eventAdmin, topics, metricsService, pubQueueProvider, 
Condition.INSTANCE, config, bcontext);
+                eventAdmin, metricsService, pubQueueProvider, 
Condition.INSTANCE, config, bcontext);
         when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
index c30d56b..5a7165b 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
@@ -50,7 +50,6 @@ import 
org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.queue.QueueState;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -59,7 +58,6 @@ import org.mockito.Captor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -79,9 +77,6 @@ public class MessagingCacheCallbackTest {
     @Mock
     private MessagingProvider messagingProvider;
     
-    @Spy
-    private Topics topics;
-    
     @Mock
     private JournalAvailable journalAvailable;
     
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index f4bf3c8..cc372db 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -61,9 +61,6 @@ public class PackageDistributedNotifierTest {
     @Mock
     private MessagingProvider messagingProvider;
 
-    @Spy
-    private Topics topics;
-
     @Spy
     private ResourceResolverFactory resolverFactory = new 
MockResourceResolverFactory();
 
@@ -108,7 +105,7 @@ public class PackageDistributedNotifierTest {
                 .thenReturn(statPoller);
         URI serverURI = new URI("http://myserver.apache.org:1234/somepath";);
         when(messagingProvider.getServerUri()).thenReturn(serverURI);
-        
when(messagingProvider.createSender(Mockito.eq(topics.getEventTopic())))
+        when(messagingProvider.createSender(Topics.EVENT_TOPIC))
             .thenReturn(sender);
 
         QueueErrors queueErrors = mock(QueueErrors.class);
@@ -117,7 +114,7 @@ public class PackageDistributedNotifierTest {
         for(int i = 0; i <= 20; i++)
             handler.handle(info(i), packageMessage("packageid" + i, 
PUB_AGENT_NAME));
 
-        notifier = new PackageDistributedNotifier(eventAdmin, 
pubQueueCacheService, messagingProvider, topics, resolverFactory, true);
+        notifier = new PackageDistributedNotifier(eventAdmin, 
pubQueueCacheService, messagingProvider, resolverFactory, true);
     }
 
     @Test
@@ -155,7 +152,7 @@ public class PackageDistributedNotifierTest {
 
         notifier.storeLastDistributedOffset();
 
-        notifier = new PackageDistributedNotifier(eventAdmin, 
pubQueueCacheService, messagingProvider, topics, resolverFactory, false);
+        notifier = new PackageDistributedNotifier(eventAdmin, 
pubQueueCacheService, messagingProvider, resolverFactory, false);
         // the last raised offset persisted in the author repository is not 
considered because `ensureEvent` is disabled
         when(pubQueueCacheService.getOffsetQueue(PUB_AGENT_NAME, 16))
                 .thenReturn(queueProvider.getOffsetQueue(PUB_AGENT_NAME, 16));
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 1bfb978..4e28237 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -34,7 +34,6 @@ import 
org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
-import org.apache.sling.distribution.journal.shared.Topics;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.Before;
@@ -71,8 +70,6 @@ public class CommandPollerTest {
     
     private MessageHandler<ClearCommand> commandHandler;
 
-    private Topics topics = new Topics();
-
     private MessageInfo info;
 
     @Before
@@ -157,7 +154,7 @@ public class CommandPollerTest {
                 Mockito.eq(Reset.earliest), 
                 handlerCaptor.capture()))
             .thenReturn(poller);
-        commandPoller = new CommandPoller(clientProvider, topics, 
SUB_SLING_ID, SUB_AGENT_NAME, callback);
+        commandPoller = new CommandPoller(clientProvider, SUB_SLING_ID, 
SUB_AGENT_NAME, callback);
         commandHandler = handlerCaptor.getValue().getHandler();
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 56ab9fc..47432a3 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -121,7 +121,7 @@ public class SubscriberTest {
     private static final String PUB1_SLING_ID = "pub1sling";
     private static final String PUB1_AGENT_NAME = "pub1agent";
     
-    private static final String STORE_PACKAGE_NODE_NAME = 
"myserver.apache.org_somepath_aemdistribution_package";
+    private static final String STORE_PACKAGE_NODE_NAME = 
"myserver.apache.org_somepath_package";
 
     private static final PackageMessage BASIC_ADD_PACKAGE = 
PackageMessage.builder()
             .pkgId("myid")
@@ -162,9 +162,6 @@ public class SubscriberTest {
     @Mock
     MessagingProvider clientProvider;
     
-    @Spy
-    Topics topics = new Topics();
-
     @Mock
     EventAdmin eventAdmin;
     
@@ -232,11 +229,11 @@ public class SubscriberTest {
 
         URI serverURI = new URI("http://myserver.apache.org:1234/somepath";);
         when(clientProvider.getServerUri()).thenReturn(serverURI);
-        
when(clientProvider.<PackageStatusMessage>createSender(topics.getStatusTopic())).thenReturn(statusSender);
-        
when(clientProvider.<DiscoveryMessage>createSender(topics.getDiscoveryTopic())).thenReturn(discoverySender);
+        
when(clientProvider.<PackageStatusMessage>createSender(Topics.STATUS_TOPIC)).thenReturn(statusSender);
+        
when(clientProvider.<DiscoveryMessage>createSender(Topics.DISCOVERY_TOPIC)).thenReturn(discoverySender);
 
         when(clientProvider.createPoller(
-                Mockito.eq(topics.getCommandTopic()),
+                Mockito.eq(Topics.COMMAND_TOPIC),
                 Mockito.eq(Reset.earliest), 
                 commandCaptor.capture()))
             .thenReturn(commandPoller);
@@ -487,7 +484,7 @@ public class SubscriberTest {
         subscriber.bookKeeperFactory = bookKeeperFactory;
         subscriber.activate(config, context, props);
         verify(clientProvider).createPoller(
-                Mockito.eq(topics.getPackageTopic()),
+                Mockito.eq(Topics.PACKAGE_TOPIC),
                 Mockito.eq(Reset.latest), 
                 Mockito.isNull(String.class),
                 packageCaptor.capture(),
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
index 820bfff..379b494 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
@@ -213,7 +213,7 @@ public class PubQueueTest {
     }
     
     private static String packageId(int nr) {
-        return PACKAGE_ID_PREFIX + new Integer(nr).toString();
+        return PACKAGE_ID_PREFIX + Integer.valueOf(nr).toString();
     }
 
     private Stream<DistributionQueueEntry> 
streamOf(Iterable<DistributionQueueEntry> entries) {


Reply via email to