This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push: new 0e38eb8 Sling 12171 2 (#128) 0e38eb8 is described below commit 0e38eb8dc91008f70c5ea64b4e1906afdc7dfda5 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Fri Dec 1 22:40:19 2023 +0100 Sling 12171 2 (#128) * SLING-12171 - Extract jmx creation * SLING-12171 - Use activator based inject, use OSGiContext --- pom.xml | 8 +- .../impl/publisher/DistributionPublisher.java | 95 ++++++++++++---------- .../impl/publisher/DistributionPublisherTest.java | 91 ++++----------------- 3 files changed, 75 insertions(+), 119 deletions(-) diff --git a/pom.xml b/pom.xml index d5035a6..64e25ab 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.commons.metrics</artifactId> - <version>1.2.10</version> + <version>1.2.12</version> <scope>provided</scope> </dependency> <dependency> @@ -244,6 +244,12 @@ <scope>provided</scope> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>3.2.3</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.testing.sling-mock-oak</artifactId> diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index a81a210..da83c61 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -89,56 +89,65 @@ public class DistributionPublisher implements DistributionAgent { @Nonnull private final DefaultDistributionLog log; - @Reference - private MessagingProvider messagingProvider; + private final DistributionPackageBuilder packageBuilder; - @Reference(name = "packageBuilder") - private DistributionPackageBuilder packageBuilder; + private final DiscoveryService discoveryService; - @Reference - private DiscoveryService discoveryService; + private final PackageMessageFactory factory; - @Reference - private PackageMessageFactory factory; + private final EventAdmin eventAdmin; - @Reference - private EventAdmin eventAdmin; + private final DistributionMetricsService distributionMetricsService; - @Reference - private Topics topics; - - @Reference - private DistributionMetricsService distributionMetricsService; - - @Reference - private PubQueueProvider pubQueueProvider; + private final PubQueueProvider pubQueueProvider; - private String pubAgentName; + private final String pubAgentName; - private String pkgType; + private final String pkgType; - private long queuedTimeout; + private final long queuedTimeout; - private ServiceRegistration<DistributionAgent> componentReg; + private final ServiceRegistration<DistributionAgent> componentReg; - private Consumer<PackageMessage> sender; + private final Consumer<PackageMessage> sender; - private JMXRegistration reg; + private final JMXRegistration reg; - private Closeable statusPoller; + private final Closeable statusPoller; - private DistributionLogEventListener distributionLogEventListener; + private final DistributionLogEventListener distributionLogEventListener; + @Activate + public DistributionPublisher( + @Reference + MessagingProvider messagingProvider, + @Reference(name = "packageBuilder") + DistributionPackageBuilder packageBuilder, + @Reference + DiscoveryService discoveryService, + @Reference + PackageMessageFactory factory, + @Reference + EventAdmin eventAdmin, + @Reference + Topics topics, + @Reference + DistributionMetricsService distributionMetricsService, + @Reference + PubQueueProvider pubQueueProvider, + PublisherConfiguration config, + BundleContext context) { + this.packageBuilder = packageBuilder; + this.discoveryService = discoveryService; + this.factory = factory; + this.eventAdmin = eventAdmin; + this.distributionMetricsService = distributionMetricsService; + this.pubQueueProvider = pubQueueProvider; - public DistributionPublisher() { + pubAgentName = requireNotBlank(config.name()); log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO); - } - - @Activate - public void activate(PublisherConfiguration config, BundleContext context) { requireNonNull(factory); requireNonNull(distributionMetricsService); - pubAgentName = requireNotBlank(config.name()); queuedTimeout = config.queuedTimeout(); @@ -151,16 +160,8 @@ public class DistributionPublisher implements DistributionAgent { distributionLogEventListener = new DistributionLogEventListener(context, log, pubAgentName); - DistPublisherJMX bean; - try { - bean = new DistPublisherJMX(pubAgentName, discoveryService, this); - } catch (NotCompliantMBeanException e) { - throw new RuntimeException(e); - } - reg = new JMXRegistration(bean, "agent", pubAgentName); + reg = createAndRegisterJMXBean(); - String msg = format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s", - pubAgentName, pkgType, queuedTimeout); distributionMetricsService.createGauge( DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName, () -> discoveryService.getTopologyView().getSubscribedAgentIds().size() @@ -172,7 +173,8 @@ public class DistributionPublisher implements DistributionAgent { HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus) ); - log.info(msg); + log.info("Started Publisher agent {} with packageBuilder {}, queuedTimeout {}", + pubAgentName, pkgType, queuedTimeout); } @Deactivate @@ -184,6 +186,15 @@ public class DistributionPublisher implements DistributionAgent { log.info(msg); } + private JMXRegistration createAndRegisterJMXBean() { + try { + DistPublisherJMX bean = new DistPublisherJMX(pubAgentName, discoveryService, this); + return new JMXRegistration(bean, "agent", pubAgentName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException(e); + } + } + private Dictionary<String, Object> createServiceProps(PublisherConfiguration config) { Dictionary<String, Object> props = new Hashtable<>(); props.put("name", config.name()); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 526c076..8c9fb16 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -35,24 +35,20 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.Dictionary; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLong; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.commons.metrics.Counter; -import org.apache.sling.commons.metrics.Histogram; -import org.apache.sling.commons.metrics.Meter; -import org.apache.sling.commons.metrics.Timer; +import org.apache.sling.commons.metrics.MetricsService; +import org.apache.sling.commons.metrics.internal.MetricsServiceImpl; import org.apache.sling.distribution.DistributionRequest; import org.apache.sling.distribution.DistributionRequestState; import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.DistributionResponse; import org.apache.sling.distribution.SimpleDistributionRequest; -import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; @@ -66,21 +62,22 @@ import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.settings.SlingSettingsService; +import org.apache.sling.testing.mock.osgi.junit.OsgiContext; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; import org.osgi.service.event.EventAdmin; import org.osgi.util.converter.Converters; +@RunWith(MockitoJUnitRunner.class) public class DistributionPublisherTest { private static final String SUBAGENT1 = "subscriber-agent1"; @@ -97,9 +94,6 @@ public class DistributionPublisherTest { @Mock private PubQueueProvider pubQueueProvider; - @Mock - private SlingSettingsService slingSettings; - @Mock private MessagingProvider messagingProvider; @@ -109,30 +103,12 @@ public class DistributionPublisherTest { @Mock private DistributionPackageBuilder packageBuilder; - @Mock private DistributionMetricsService distributionMetricsService; - @Mock - private Histogram histogram; - - @Mock - private Meter meter; - - @Mock - private Timer timer; - - @Mock - private Timer.Context timerContext; + private OsgiContext context = new OsgiContext(); - @Mock - private BundleContext context; - - @InjectMocks private DistributionPublisher publisher; - @Mock - private ServiceRegistration<DistributionAgent> serviceReg; - @Mock private ResourceResolver resourceResolver; @@ -148,26 +124,24 @@ public class DistributionPublisherTest { @Spy private Topics topics = new Topics(); - @SuppressWarnings("unchecked") @Before public void before() throws Exception { - MockitoAnnotations.openMocks(this).close(); + MetricsService metricsService = context.registerInjectActivateService(MetricsServiceImpl.class); + distributionMetricsService = new DistributionMetricsService(metricsService); when(packageBuilder.getType()).thenReturn("journal"); Map<String, String> props = Collections.singletonMap("name", PUB1AGENT1); PublisherConfiguration config = Converters.standardConverter().convert(props).to(PublisherConfiguration.class); - when(slingSettings.getSlingId()).thenReturn("pub1sling"); - when(context.registerService(Mockito.eq(DistributionAgent.class), Mockito.eq(publisher), - Mockito.any(Dictionary.class))).thenReturn(serviceReg); + + BundleContext bcontext = context.bundleContext(); when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender); - publisher.activate(config, context); - when(timer.time()).thenReturn(timerContext); + publisher = new DistributionPublisher(messagingProvider, packageBuilder, discoveryService, factory, + eventAdmin, topics, distributionMetricsService, pubQueueProvider, config, bcontext); when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier); } @After public void after() { publisher.deactivate(); - verify(serviceReg).unregister(); } @Test @@ -242,11 +216,10 @@ public class DistributionPublisherTest { @Test public void testGetWrongQueue() throws DistributionException, IOException { - Counter counter = new TestCounter(); - when(distributionMetricsService.getQueueAccessErrorCount()).thenReturn(counter); DistributionQueue queue = publisher.getQueue("i_am_not_a_queue"); assertNull(queue); + Counter counter = distributionMetricsService.getQueueAccessErrorCount(); assertEquals("Wrong queue counter expected",1, counter.getCount()); } @@ -255,20 +228,18 @@ public class DistributionPublisherTest { when(pubQueueProvider.getQueue(Mockito.any(), Mockito.any())) .thenThrow(new RuntimeException("Error")); - Counter counter = new TestCounter(); - when(distributionMetricsService.getQueueAccessErrorCount()).thenReturn(counter); try { publisher.getQueue(QUEUE_NAME); fail("Expected exception not thrown"); } catch (RuntimeException expectedException) { } + Counter counter = distributionMetricsService.getQueueAccessErrorCount(); assertEquals("Wrong getQueue error counter",1, counter.getCount()); } @Test(expected = DistributionException.class) public void testEmptyPaths() throws Exception { DistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, new String[0]); - when(distributionMetricsService.getDroppedRequests()).thenReturn(meter); publisher.execute(resourceResolver, request); } @@ -278,11 +249,6 @@ public class DistributionPublisherTest { when(factory.create(any(DistributionPackageBuilder.class),Mockito.eq(resourceResolver), anyString(), Mockito.eq(request))).thenReturn(pkg); CompletableFuture<Long> callback = CompletableFuture.completedFuture(-1L); when(queuedNotifier.registerWait(Mockito.eq(pkg.getPkgId()))).thenReturn(callback); - when(distributionMetricsService.getExportedPackageSize()).thenReturn(histogram); - when(distributionMetricsService.getAcceptedRequests()).thenReturn(meter); - when(distributionMetricsService.getDroppedRequests()).thenReturn(meter); - when(distributionMetricsService.getBuildPackageDuration()).thenReturn(timer); - when(distributionMetricsService.getEnqueuePackageDuration()).thenReturn(timer); DistributionResponse response = publisher.execute(resourceResolver, request); @@ -311,31 +277,4 @@ public class DistributionPublisherTest { .build(); } - class TestCounter implements Counter { - AtomicLong l = new AtomicLong(); - @Override public void increment() { - l.getAndIncrement(); - } - - @Override public void decrement() { - l.decrementAndGet(); - } - - @Override public void increment(long n) { - l.addAndGet(n); - } - - @Override public void decrement(long n) { - l.addAndGet(-n); - } - - @Override public long getCount() { - return l.get(); - } - - @Override public <A> A adaptTo(Class<A> type) { - return null; - } - }; - }