This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new ed5fd0e IGNITE-12464 : Metrics for the Java Services V3. (#9457) ed5fd0e is described below commit ed5fd0e0721e2e799987a200a279ab96eeb6dca9 Author: Vladimir Steshin <vlads...@gmail.com> AuthorDate: Wed Jan 12 18:12:03 2022 +0300 IGNITE-12464 : Metrics for the Java Services V3. (#9457) --- .../ignite/snippets/services/ServiceExample.java | 8 +- .../java/org/apache/ignite/IgniteServices.java | 4 +- .../processors/service/GridServiceProxy.java | 49 +++- .../processors/service/IgniteServiceProcessor.java | 90 +++++- .../service/LazyServiceConfiguration.java | 1 + .../processors/service/ServiceContextImpl.java | 35 ++- .../apache/ignite/internal/util/IgniteUtils.java | 19 ++ .../ignite/services/ServiceConfiguration.java | 36 +++ .../processors/service/GridServiceMetricsTest.java | 323 +++++++++++++++++++++ .../GridServiceProcessorAbstractSelfTest.java | 23 +- .../service/GridServiceProcessorProxySelfTest.java | 184 ++++++++---- .../processors/service/inner/MyService.java | 7 + .../testsuites/IgniteServiceGridTestSuite.java | 2 + 13 files changed, 691 insertions(+), 90 deletions(-) diff --git a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/services/ServiceExample.java b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/services/ServiceExample.java index d7cd753..a1658f1 100644 --- a/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/services/ServiceExample.java +++ b/docs/_docs/code-snippets/java/src/main/java/org/apache/ignite/snippets/services/ServiceExample.java @@ -50,11 +50,11 @@ public class ServiceExample { // Print the latest counter value from our counter service. System.out.println("Incremented value : " + counterService.get()); - + //tag::undeploy[] services.cancel("myCounterService"); //end::undeploy[] - + ignite.close(); } @@ -63,7 +63,7 @@ public class ServiceExample { //tag::deploy-with-cluster-group[] Ignite ignite = Ignition.start(); - //deploy the service to the nodes that host the cache named "myCache" + //deploy the service to the nodes that host the cache named "myCache" ignite.services(ignite.cluster().forCacheNodes("myCache")); //end::deploy-with-cluster-group[] @@ -83,7 +83,7 @@ public class ServiceExample { @Test void affinityKey() { - + //tag::deploy-by-key[] Ignite ignite = Ignition.start(); diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java index 769d322..d5acb38 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java @@ -589,7 +589,7 @@ public interface IgniteServices extends IgniteAsyncSupport { * @param sticky Whether or not Ignite should always contact the same remote * service or try to load-balance between services. * @param <T> Service type. - * @return Either proxy over remote service or local service if it is deployed locally. + * @return Proxy over service. * @throws IgniteException If failed to create service proxy. */ public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky) throws IgniteException; @@ -606,7 +606,7 @@ public interface IgniteServices extends IgniteAsyncSupport { * @param timeout If greater than 0 created proxy will wait for service availability only specified time, * and will limit remote service invocation time. * @param <T> Service type. - * @return Either proxy over remote service or local service if it is deployed locally. + * @return Proxy over service. * @throws IgniteException If failed to create service proxy. */ public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index f2d4886..8f60b2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -47,6 +48,7 @@ import org.apache.ignite.internal.binary.BinaryArray; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; import org.apache.ignite.internal.processors.platform.PlatformNativeException; import org.apache.ignite.internal.processors.platform.services.PlatformService; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -205,8 +207,13 @@ public class GridServiceProxy<T> implements Serializable { if (svcCtx != null) { Service svc = svcCtx.service(); - if (svc != null) - return callServiceLocally(svc, mtd, args, callAttrs); + if (svc != null) { + HistogramMetricImpl hist = svcCtx.isStatisticsEnabled() ? + svcCtx.metrics().findMetric(mtd.getName()) : null; + + return hist == null ? callServiceLocally(svc, mtd, args, callAttrs) : + measureCall(hist, () -> callServiceLocally(svc, mtd, args, callAttrs)); + } } } else { @@ -447,13 +454,32 @@ public class GridServiceProxy<T> implements Serializable { /** * @param mtd Method to invoke. */ - String methodName(Method mtd) { + private static String methodName(Method mtd) { PlatformServiceMethod ann = mtd.getDeclaredAnnotation(PlatformServiceMethod.class); return ann == null ? mtd.getName() : ann.value(); } /** + * Calls the target, measures and registers its duration. + * + * @param histogram Related metric. + * @param target Target to call and measure. + */ + private static <T> T measureCall( + HistogramMetricImpl histogram, + Callable<T> target + ) throws Exception { + long startTime = System.nanoTime(); + + try { + return target.call(); + } finally { + histogram.value(System.nanoTime() - startTime); + } + } + + /** * Invocation handler for service proxy. */ private class ProxyInvocationHandler implements InvocationHandler { @@ -538,17 +564,22 @@ public class GridServiceProxy<T> implements Serializable { Method mtd = ctx.method(key); - Object res; + HistogramMetricImpl hist = ctx.isStatisticsEnabled() ? ctx.metrics().findMetric(mtd.getName()) : null; - if (ctx.service() instanceof PlatformService && mtd == null) - res = callPlatformService((PlatformService)ctx.service()); - else - res = callService(ctx.service(), mtd); + Object res = hist == null ? callService(ctx, mtd) : measureCall(hist, () -> callService(ctx, mtd)); return U.marshal(ignite.configuration().getMarshaller(), res); } /** */ + private Object callService(ServiceContextImpl svcCtx, Method mtd) throws Exception { + if (svcCtx.service() instanceof PlatformService && mtd == null) + return callPlatformService((PlatformService)svcCtx.service()); + else + return callOrdinaryService(svcCtx.service(), mtd); + } + + /** */ private Object callPlatformService(PlatformService srv) { try { return srv.invokeMethod(mtdName, false, true, args, callAttrs); @@ -562,7 +593,7 @@ public class GridServiceProxy<T> implements Serializable { } /** */ - private Object callService(Service srv, Method mtd) throws Exception { + private Object callOrdinaryService(Service srv, Method mtd) throws Exception { if (mtd == null) throw new GridServiceMethodNotFoundException(svcName, mtdName, argTypes); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java index 055fc58..6d7bdb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.service; +import java.io.Externalizable; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -67,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.ValidationOnNodeJoinUtils; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; +import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.platform.services.PlatformService; import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityContext; @@ -93,17 +96,22 @@ import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.discovery.DiscoveryDataBag; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry; import org.apache.ignite.spi.systemview.view.ServiceView; import org.apache.ignite.thread.IgniteThreadFactory; import org.apache.ignite.thread.OomExceptionHandler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.ignite.configuration.DeploymentMode.ISOLATED; import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.SERVICE_PROC; +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; import static org.apache.ignite.internal.processors.security.SecurityUtils.nodeSecurityContext; +import static org.apache.ignite.internal.util.IgniteUtils.allInterfaces; import static org.apache.ignite.plugin.security.SecurityPermission.SERVICE_DEPLOY; /** @@ -126,6 +134,21 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni /** */ public static final String SVCS_VIEW_DESC = "Services"; + /** Base name domain for invocation metrics. */ + private static final String SERVICE_METRIC_REGISTRY = "Services"; + + /** Description for the service method invocation metric. */ + private static final String DESCRIPTION_OF_INVOCATION_METRIC_PREF = "Duration in milliseconds of "; + + /** Default bounds of invocation histogram in nanoseconds. */ + public static final long[] DEFAULT_INVOCATION_BOUNDS = new long[] { + NANOSECONDS.convert(1, MILLISECONDS), + NANOSECONDS.convert(10, MILLISECONDS), + NANOSECONDS.convert(50, MILLISECONDS), + NANOSECONDS.convert(200, MILLISECONDS), + NANOSECONDS.convert(1000, MILLISECONDS) + }; + /** Local service instances. */ private final ConcurrentMap<IgniteUuid, Collection<ServiceContextImpl>> locServices = new ConcurrentHashMap<>(); @@ -316,6 +339,8 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni deployedServices.clear(); + ctx.metric().remove(SERVICE_METRIC_REGISTRY); + locServices.values().stream().flatMap(Collection::stream).forEach(srvcCtx -> { cancel(srvcCtx); @@ -1039,7 +1064,7 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni if (hasLocalNode(prj)) { ServiceContextImpl ctx = serviceContext(name); - if (ctx != null) { + if (ctx != null && !ctx.isStatisticsEnabled()) { Service srvc = ctx.service(); if (srvc != null && callAttrsProvider == null) { @@ -1267,7 +1292,8 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni UUID.randomUUID(), cacheName, affKey, - Executors.newSingleThreadExecutor(threadFactory)); + Executors.newSingleThreadExecutor(threadFactory), + cfg.isStatisticsEnabled()); ctxs.add(srvcCtx); @@ -1276,6 +1302,8 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni } } + ReadOnlyMetricRegistry invocationMetrics = null; + for (final ServiceContextImpl srvcCtx : toInit) { final Service srvc; @@ -1302,6 +1330,13 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni log.info("Starting service instance [name=" + srvcCtx.name() + ", execId=" + srvcCtx.executionId() + ']'); + if (cfg.isStatisticsEnabled()) { + if (invocationMetrics == null) + invocationMetrics = createServiceMetrics(srvcCtx); + + srvcCtx.metrics(invocationMetrics); + } + // Start service in its own thread. final ExecutorService exe = srvcCtx.executor(); @@ -1388,14 +1423,20 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni * @param ctxs Contexts to cancel. * @param cancelCnt Number of contexts to cancel. */ - private void cancel(Iterable<ServiceContextImpl> ctxs, int cancelCnt) { + private void cancel(Collection<ServiceContextImpl> ctxs, int cancelCnt) { for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext(); ) { - cancel(it.next()); + ServiceContextImpl svcCtx = it.next(); + + cancel(svcCtx); it.remove(); - if (--cancelCnt == 0) + if (--cancelCnt == 0) { + if (ctxs.isEmpty()) + ctx.metric().remove(serviceMetricRegistryName(svcCtx.name())); + break; + } } } @@ -1966,4 +2007,43 @@ public class IgniteServiceProcessor extends GridProcessorAdapter implements Igni return null; } + + /** + * Creates metrics registry for the invocation histograms. + * + * @param srvcCtx ServiceContext. + * @return Created metric registry. + */ + private ReadOnlyMetricRegistry createServiceMetrics(ServiceContextImpl srvcCtx) { + MetricRegistry metricRegistry = ctx.metric().registry(serviceMetricRegistryName(srvcCtx.name())); + + for (Class<?> itf : allInterfaces(srvcCtx.service().getClass())) { + for (Method mtd : itf.getMethods()) { + if (metricIgnored(mtd.getDeclaringClass())) + continue; + + metricRegistry.histogram(mtd.getName(), DEFAULT_INVOCATION_BOUNDS, DESCRIPTION_OF_INVOCATION_METRIC_PREF + + '\'' + mtd.getName() + "()'"); + } + } + + return metricRegistry; + } + + /** + * @return {@code True} if metrics should not be created for this class or interface. + */ + private static boolean metricIgnored(Class<?> cls) { + return Service.class.equals(cls) || Externalizable.class.equals(cls) || PlatformService.class.equals(cls); + } + + /** + * Gives proper name for service metric registry. + * + * @param srvcName Name of the service. + * @return registry name for service {@code srvcName}. + */ + static String serviceMetricRegistryName(String srvcName) { + return metricName(SERVICE_METRIC_REGISTRY, srvcName); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/LazyServiceConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/LazyServiceConfiguration.java index e0add26..7d34665 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/LazyServiceConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/LazyServiceConfiguration.java @@ -64,6 +64,7 @@ public class LazyServiceConfiguration extends ServiceConfiguration { this.srvcBytes = srvcBytes; srvc = cfg.getService(); srvcClsName = srvc.getClass().getName(); + isStatisticsEnabled = cfg.isStatisticsEnabled(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java index 38acfce..a5aa112 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceCallContext; import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry; import org.jetbrains.annotations.Nullable; /** @@ -60,6 +61,9 @@ public class ServiceContextImpl implements ServiceContext { /** Methods reflection cache. */ private final ConcurrentMap<GridServiceMethodReflectKey, Method> mtds = new ConcurrentHashMap<>(); + /** Invocation metrics. */ + private ReadOnlyMetricRegistry metrics; + /** Service. */ @GridToStringExclude private volatile Service svc; @@ -67,23 +71,29 @@ public class ServiceContextImpl implements ServiceContext { /** Cancelled flag. */ private volatile boolean isCancelled; + /** Service statistics flag. */ + private final boolean isStatisticsEnabled; + /** * @param name Service name. * @param execId Execution ID. * @param cacheName Cache name. * @param affKey Affinity key. * @param exe Executor service. + * @param statisticsEnabled Service statistics flag. */ ServiceContextImpl(String name, UUID execId, String cacheName, Object affKey, - ExecutorService exe) { + ExecutorService exe, + boolean statisticsEnabled) { this.name = name; this.execId = execId; this.cacheName = cacheName; this.affKey = affKey; this.exe = exe; + this.isStatisticsEnabled = statisticsEnabled; } /** {@inheritDoc} */ @@ -133,6 +143,29 @@ public class ServiceContextImpl implements ServiceContext { } /** + * @return Invocation metrics. + */ + @Nullable ReadOnlyMetricRegistry metrics() { + return metrics; + } + + /** + * Sets the invocation metrics. + * + * @return {@code this}. + */ + ServiceContextImpl metrics(ReadOnlyMetricRegistry metrics) { + this.metrics = metrics; + + return this; + } + + /** @return {@code True} if statistics is enabled for this service. {@code False} otherwise. */ + boolean isStatisticsEnabled() { + return isStatisticsEnabled; + } + + /** * @param key Method key. * @return Method. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index acacde4..89999a0 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -6117,6 +6117,25 @@ public abstract class IgniteUtils { } /** + * Provides all interfaces of {@code cls} including inherited ones. Excludes duplicated ones in case of multiple + * inheritance. + * + * @param cls Class to search for interfaces. + * @return Collection of interfaces of {@code cls}. + */ + public static Collection<Class<?>> allInterfaces(Class<?> cls) { + Set<Class<?>> interfaces = new HashSet<>(); + + while (cls != null) { + interfaces.addAll(Arrays.asList(cls.getInterfaces())); + + cls = cls.getSuperclass(); + } + + return interfaces; + } + + /** * Gets simple class name taking care of empty names. * * @param cls Class to get the name for. diff --git a/modules/core/src/main/java/org/apache/ignite/services/ServiceConfiguration.java b/modules/core/src/main/java/org/apache/ignite/services/ServiceConfiguration.java index a517197..d4efb0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/services/ServiceConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/services/ServiceConfiguration.java @@ -17,8 +17,11 @@ package org.apache.ignite.services; +import java.io.Externalizable; import java.io.Serializable; +import org.apache.ignite.IgniteServices; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.service.IgniteServiceProcessor; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgnitePredicate; @@ -77,6 +80,9 @@ public class ServiceConfiguration implements Serializable { @GridToStringExclude protected IgnitePredicate<ClusterNode> nodeFilter; + /** Enables or disables service statistics. */ + protected boolean isStatisticsEnabled; + /** * Gets service name. * <p> @@ -256,6 +262,36 @@ public class ServiceConfiguration implements Serializable { return this; } + /** + * Enables or disables statistics for the service. If enabled, durations of the service's methods invocations are + * measured (in milliseconds) and stored in histograms of metric registry + * {@link IgniteServiceProcessor#SERVICE_METRIC_REGISTRY} by service name. + * <p> + * <b>NOTE:</b> Statistics are collected only with service proxies obtaining by methods like + * {@link IgniteServices#serviceProxy(String, Class, boolean)} and won't work for direct referense of local + * services which you can get by, for example, {@link IgniteServices#service(String)}. + * <p> + * <b>NOTE:</b> Statistics are collected only for all service's interfaces except {@link Service} and + * {@link Externalizable} if implemented. Statistics are not collected for methods not declared in any interface. + * + * @param enabled If {@code true}, enables service statistics. Disables otherwise. + * @return {@code this} for chaining. + */ + public ServiceConfiguration setStatisticsEnabled(boolean enabled) { + isStatisticsEnabled = enabled; + + return this; + } + + /** + * Tells wheter statistics for this service is enabled. + * + * @return {@code True}, if statistics for this service will be enabled. {@code False} otherwise. + */ + public boolean isStatisticsEnabled() { + return isStatisticsEnabled; + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (!equalsIgnoreNodeFilter(o)) diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceMetricsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceMetricsTest.java new file mode 100644 index 0000000..9b036cf --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceMetricsTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import com.google.common.collect.Iterables; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.metric.GridMetricManager; +import org.apache.ignite.internal.processors.service.inner.MyService; +import org.apache.ignite.internal.processors.service.inner.MyServiceFactory; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.spi.metric.HistogramMetric; +import org.apache.ignite.spi.metric.Metric; +import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry; +import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.service.IgniteServiceProcessor.serviceMetricRegistryName; + +/** + * Tests metrics of service invocations. + */ +public class GridServiceMetricsTest extends GridCommonAbstractTest { + /** Number of service invocations. */ + private static final int INVOKE_CNT = 50; + + /** Service name used in the tests. */ + private static final String SRVC_NAME = "TestService"; + + /** Error message of created metrics. */ + private static final String METRICS_MUST_NOT_BE_CREATED = "Service metric registry must not be created."; + + /** Utility holder of current grid number. */ + private int gridNum; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + // JMX metrics exposition to see actual namings and placement of the metrics. + cfg.setMetricExporterSpi(new JmxMetricExporterSpi()); + + return cfg; + } + + /** Checks service metrics are enabled / disabled properly. */ + @Test + public void testServiceMetricsEnabledDisabled() throws Exception { + IgniteEx ignite = startGrid(); + + ServiceConfiguration srvcCfg = serviceCfg(MyServiceFactory.create(), 0, 1); + + srvcCfg.setStatisticsEnabled(false); + + ignite.services().deploy(srvcCfg); + + assertNull(METRICS_MUST_NOT_BE_CREATED, findMetricRegistry(ignite.context().metric(), SRVC_NAME)); + + ignite.services().cancel(SRVC_NAME); + + srvcCfg.setStatisticsEnabled(true); + + ignite.services().deploy(srvcCfg); + + assertNotNull("Service metric registry must be created.", + findMetricRegistry(ignite.context().metric(), SRVC_NAME)); + } + + /** Checks metric are created when service is deployed and removed when service is undeployed. */ + @Test + public void testMetricsOnServiceDeployAndCancel() throws Exception { + List<IgniteEx> grids = startGrids(3, false); + + // 2 services per node. + grids.get(0).services().deploy(serviceCfg(MyServiceFactory.create(), grids.size() * 2, 2)); + + awaitPartitionMapExchange(); + + int expectedCnt = Arrays.stream(MyService.class.getDeclaredMethods()).map(Method::getName).collect( + Collectors.toSet()).size(); + + // Make sure metrics are registered. + for (IgniteEx ignite : grids) + assertEquals(metricsCnt(ignite), expectedCnt); + + grids.get(0).services().cancel(SRVC_NAME); + + awaitPartitionMapExchange(); + + for (IgniteEx ignite : grids) + assertEquals(metricsCnt(ignite), 0); + } + + /** Tests service metrics migrates correclty with the service redeployment. */ + @Test + public void testRedeploy() throws Exception { + List<IgniteEx> grids = startGrids(3, false); + + // 2 services per node. + grid(0).services().deploy(serviceCfg(MyServiceFactory.create(), 1, 0)); + + awaitPartitionMapExchange(); + + // Only same method metric count must persist across the cluster for the singleton. + int expectedCnt = Arrays.stream(MyService.class.getDeclaredMethods()).map(Method::getName).collect( + Collectors.toSet()).size(); + + // Only same method metric count must persist across the cluster for the singleton. + assertEquals("Only one metric registry can persist for one service instance", expectedCnt, + grids.stream().map(GridServiceMetricsTest::metricsCnt).mapToInt(Integer::intValue).sum()); + + for (int i = 0; i < grids.size(); ++i) { + if (metricsCnt(grid(i)) > 0) { + stopGrid(i); + + awaitPartitionMapExchange(); + + break; + } + } + + // Only same method metric count must persist across the cluster for the singleton. + assertEquals("Only one metric registry can persist for one service instance", expectedCnt, + G.allGrids().stream().map(grid -> metricsCnt((IgniteEx)grid)).mapToInt(Integer::intValue).sum()); + } + + /** Tests service metrics for single service instance. */ + @Test + public void testServiceMetricsSingle() throws Throwable { + testServiceMetrics(1, 1, 1, 1); + } + + /** Tests service metrics for multy service instance: one per server. */ + @Test + public void testServiceMetricsMulty() throws Throwable { + testServiceMetrics(3, 3, 3, 1); + } + + /** Tests service metrics for multy service instance: fewer that servers and clients. */ + @Test + public void testServiceMetricsMultyFew() throws Throwable { + testServiceMetrics(4, 3, 2, 1); + } + + /** Tests service metrics for multy service instance: serveral instances per node. */ + @Test + public void testServiceMetricsMultyDuplicated() throws Throwable { + testServiceMetrics(3, 2, 3, 3); + } + + /** Tests service metrics for multy service instance: serveral instances per node, total fewer that servers. */ + @Test + public void testServiceMetricsMultyFewDuplicated() throws Throwable { + testServiceMetrics(5, 4, 3, 2); + } + + /** + * Invokes service in various ways: from clients, servers, etc. Checks these calls reflect in the metrics. + * + * @param serverCnt Number of server nodes. + * @param clientCnt Number of client nodes. + * @param perClusterCnt Number of service instances per cluster. + * @param perNodeCnt Number of service instances per node. + */ + private void testServiceMetrics(int serverCnt, int clientCnt, int perClusterCnt, int perNodeCnt) throws Throwable { + List<IgniteEx> servers = startGrids(serverCnt, false); + + List<IgniteEx> clients = startGrids(clientCnt, true); + + servers.get(0).services().deploy(serviceCfg(MyServiceFactory.create(), perClusterCnt, perNodeCnt)); + + awaitPartitionMapExchange(); + + List<MyService> serverStickyProxies = servers.stream() + .map(ignite -> (MyService)ignite.services().serviceProxy(SRVC_NAME, MyService.class, true)) + .collect(Collectors.toList()); + + List<MyService> clientStickyProxies = clients.stream() + .map(ignite -> (MyService)ignite.services().serviceProxy(SRVC_NAME, MyService.class, true)) + .collect(Collectors.toList()); + + long invokeCollector = 0; + + // Call service through the server proxies. + for (int i = 0; i < INVOKE_CNT; ++i) { + // Call from server. + IgniteEx ignite = servers.get(i % servers.size()); + + callService4Times(ignite, serverStickyProxies.get(i % serverStickyProxies.size())); + + // Call from client. + ignite = clients.get(i % clients.size()); + + callService4Times(ignite, clientStickyProxies.get(i % clientStickyProxies.size())); + + invokeCollector += 8; + } + + long invokesInMetrics = 0; + + // Calculate and check invocations within the metrics. + for (IgniteEx ignite : servers) { + ReadOnlyMetricRegistry metrics = findMetricRegistry(ignite.context().metric(), SRVC_NAME); + + // Metrics may not be deployed on this server node. + if (metrics == null) + continue; + + for (Metric metric : metrics) { + if (metric instanceof HistogramMetric) + invokesInMetrics += sumHistogramEntries((HistogramMetric)metric); + } + } + + // Compare calls number and metrics number. + assertEquals("Calculated wrong service invocation number.", invokesInMetrics, invokeCollector); + } + + /** Expose ignite-references of the nodes as list. */ + private List<IgniteEx> startGrids(int cnt, boolean client) throws Exception { + List<IgniteEx> grids = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; ++i) + grids.add(client ? startClientGrid(gridNum++) : startGrid(gridNum++)); + + return grids; + } + + /** + * Executes 2 calls for {@link MyService} though unsticky proxy and 2 calls to {@code extraSrvc}. Total 4 are + * suposed to present in the metrics. + * + * @param ignite Server or client node. + * @param extraSrvc Extra service instance or proxy to call. + */ + private static void callService4Times(IgniteEx ignite, MyService extraSrvc) { + MyService srvc = ignite.services().serviceProxy(SRVC_NAME, MyService.class, false); + + srvc.hello(); + + srvc.hello(1); + + extraSrvc.hello(); + + extraSrvc.hello(2); + } + + /** Provides test service configuration. */ + private static ServiceConfiguration serviceCfg(Service srvc, int perClusterCnt, int perNodeCnt) { + ServiceConfiguration svcCfg = new ServiceConfiguration(); + + svcCfg.setName(SRVC_NAME); + svcCfg.setService(srvc); + svcCfg.setMaxPerNodeCount(perNodeCnt); + svcCfg.setTotalCount(perClusterCnt); + svcCfg.setStatisticsEnabled(true); + + return svcCfg; + } + + /** @return Number of metrics contained in metric registry of the test service. */ + private static int metricsCnt(IgniteEx ignite) { + return Iterables.size(ignite.context().metric().registry(serviceMetricRegistryName(SRVC_NAME))); + } + + /** + * Count total of histogram values. + * + * @param histogram Histogram to traverse. + * @return Sum of all entries of {@code histogram} buckets. + */ + private static long sumHistogramEntries(HistogramMetric histogram) { + if (histogram == null) + return 0; + + long sum = 0; + + for (int i = 0; i < histogram.value().length; ++i) + sum += histogram.value()[i]; + + return sum; + } + + /** @return Metric registry if it is found in {@code metricMgr} by name {@code srvcName}. Null otherwise. */ + private static ReadOnlyMetricRegistry findMetricRegistry(GridMetricManager metricMgr, String srvcName) { + for (ReadOnlyMetricRegistry registry : metricMgr) { + if (registry.name().equals(serviceMetricRegistryName(srvcName))) + return registry; + } + + return null; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java index edb6338..b081aa6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java @@ -42,6 +42,7 @@ import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.services.ServiceDeploymentException; import org.apache.ignite.services.ServiceDescriptor; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -216,19 +217,27 @@ public abstract class GridServiceProcessorAbstractSelfTest extends GridCommonAbs IgniteFuture<?> fut2 = svcs2.future(); - info("Deployed service: " + name); - - fut1.get(); + Exception err1 = null; - info("Finished waiting for service future: " + name); + try { + fut1.get(); + } + catch (ServiceDeploymentException e) { + if (e.getMessage().contains("Failed to deploy some services.")) + err1 = e; + else + throw new IllegalStateException("An unexpeted error caught while deploying service.", e); + } try { fut2.get(); - fail("Failed to receive mismatching configuration exception."); + if (err1 == null) + fail("Failed to receive mismatching configuration exception."); } - catch (IgniteException e) { - info("Received mismatching configuration exception: " + e.getMessage()); + catch (Exception e) { + if (!e.getMessage().contains("Failed to deploy some service")) + throw new IllegalStateException("An unexpeted error caught while concurrent deploying.", e); } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java index 76e02bf..7555df2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorProxySelfTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.service; +import java.lang.reflect.Proxy; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +28,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceContext; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; @@ -39,6 +42,13 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr return 4; } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + grid(0).services().cancelAll(); + } + /** * @throws Exception If failed. */ @@ -240,64 +250,88 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr } /** + * Checks local service without the statistics. + * * @throws Exception If failed. */ @Test - public void testLocalProxyInvocation() throws Exception { - final String name = "testLocalProxyInvocation"; - - final Ignite ignite = grid(0); - - ignite.services().deployNodeSingleton(name, new MapServiceImpl<String, Integer>()); - - for (int i = 0; i < nodeCount(); i++) { - final int idx = i; - - final AtomicReference<MapService<Integer, String>> ref = new AtomicReference<>(); - - //wait because after deployNodeSingleton we don't have guarantees what service was deploy. - boolean wait = GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - MapService<Integer, String> svc = grid(idx) - .services() - .serviceProxy(name, MapService.class, false); - - ref.set(svc); - - return svc instanceof Service; - } - }, 2000); + public void testLocalProxyInvocationWithoutStat() throws Exception { + checkLocalProxy(false); + } - // Make sure service is a local instance. - assertTrue("Invalid service instance [srv=" + ref.get() + ", node=" + i + ']', wait); + /** + * Checks local service with the statistics enabled. + * + * @throws Exception If failed. + */ + @Test + public void testLocalProxyInvocationWithStat() throws Exception { + checkLocalProxy(true); + } - ref.get().put(i, Integer.toString(i)); - } + /** + * Checks remote non-sticky proxy without the statistics. + * + * @throws Exception If failed. + */ + @Test + public void testRemoteNotStickProxyInvocationWithoutStat() throws Exception { + checkRemoteProxy(false, false); + } - MapService<Integer, String> map = ignite.services().serviceProxy(name, MapService.class, false); + /** + * Checks remote non-sticky proxy with the statistics enabled. + * + * @throws Exception If failed. + */ + @Test + public void testRemoteNotStickyProxyInvocationWithStat() throws Exception { + checkRemoteProxy(true, false); + } - for (int i = 0; i < nodeCount(); i++) - assertEquals(1, map.size()); + /** + * Checks remote sticky proxy without the statistics. + * + * @throws Exception If failed. + */ + @Test + public void testRemoteStickyProxyInvocationWithoutStat() throws Exception { + checkRemoteProxy(false, true); } /** + * Checks remote sticky proxy with the statistics enabled. + * * @throws Exception If failed. */ @Test - public void testRemoteNotStickProxyInvocation() throws Exception { - final String name = "testRemoteNotStickProxyInvocation"; + public void testRemoteStickyProxyInvocationWithStat() throws Exception { + checkRemoteProxy(true, true); + } - final Ignite ignite = grid(0); + /** + * Checks remote service proxy (node singleton) with or without the statistics. + * + * @param withStat If {@code true}, enables the service metrics, {@link ServiceConfiguration#setStatisticsEnabled(boolean)}. + * @param sticky If {@code true}, requests sticky proxy. + */ + private void checkRemoteProxy(boolean withStat, boolean sticky) throws InterruptedException { + final String svcName = "remoteServiceTest"; + + deployNodeSingleton(svcName, withStat); - ignite.services().deployNodeSingleton(name, new MapServiceImpl<String, Integer>()); + Ignite ignite = grid(0); // Get remote proxy. MapService<Integer, String> svc = ignite.services(ignite.cluster().forRemotes()). - serviceProxy(name, MapService.class, false); + serviceProxy(svcName, MapService.class, sticky); - // Make sure service is a local instance. assertFalse(svc instanceof Service); + assertTrue(Arrays.asList(svc.getClass().getInterfaces()).contains(MapService.class)); + + assertEquals(svc.size(), 0); + for (int i = 0; i < nodeCount(); i++) svc.put(i, Integer.toString(i)); @@ -305,52 +339,78 @@ public class GridServiceProcessorProxySelfTest extends GridServiceProcessorAbstr for (ClusterNode n : ignite.cluster().forRemotes().nodes()) { MapService<Integer, String> map = ignite.services(ignite.cluster().forNode(n)). - serviceProxy(name, MapService.class, false); + serviceProxy(svcName, MapService.class, sticky); - // Make sure service is a local instance. assertFalse(map instanceof Service); - size += map.size(); + assertTrue(Arrays.asList(svc.getClass().getInterfaces()).contains(MapService.class)); + + if (map.size() != 0) + size += map.size(); } assertEquals(nodeCount(), size); } /** - * @throws Exception If failed. + * Checks local service (node singleton) with or without statistics. + * + * @param withStat If {@code true}, enables the service metrics, {@link ServiceConfiguration#setStatisticsEnabled(boolean)}. */ - @Test - public void testRemoteStickyProxyInvocation() throws Exception { - final String name = "testRemoteStickyProxyInvocation"; - - final Ignite ignite = grid(0); + private void checkLocalProxy(boolean withStat) throws Exception { + final String svcName = "localProxyTest"; - ignite.services().deployNodeSingleton(name, new MapServiceImpl<String, Integer>()); + deployNodeSingleton(svcName, withStat); - // Get remote proxy. - MapService<Integer, String> svc = ignite.services(ignite.cluster().forRemotes()). - serviceProxy(name, MapService.class, true); + for (int i = 0; i < nodeCount(); i++) { + final int idx = i; - // Make sure service is a local instance. - assertFalse(svc instanceof Service); + final AtomicReference<MapService<Integer, String>> ref = new AtomicReference<>(); - for (int i = 0; i < nodeCount(); i++) - svc.put(i, Integer.toString(i)); + //wait because after deployNodeSingleton we don't have guarantees what service was deploy. + boolean wait = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + MapService<Integer, String> svc = grid(idx) + .services() + .serviceProxy(svcName, MapService.class, false); - int size = 0; + ref.set(svc); - for (ClusterNode n : ignite.cluster().forRemotes().nodes()) { - MapService<Integer, String> map = ignite.services(ignite.cluster().forNode(n)). - serviceProxy(name, MapService.class, false); + return (withStat ? Proxy.isProxyClass(svc.getClass()) : svc instanceof Service) && + Arrays.asList(svc.getClass().getInterfaces()).contains(MapService.class); + } + }, 2000); // Make sure service is a local instance. - assertFalse(map instanceof Service); + assertTrue("Invalid service instance [srv=" + ref.get() + ", node=" + i + ']', wait); - if (map.size() != 0) - size += map.size(); + ref.get().put(i, Integer.toString(i)); } - assertEquals(nodeCount(), size); + MapService<Integer, String> map = grid(0).services().serviceProxy(svcName, MapService.class, false); + + for (int i = 0; i < nodeCount(); i++) + assertEquals(1, map.size()); + } + + /** + * Deploys {@link MapServiceImpl} service over the cluster as node singleton. + * + * @param svcName Service name + * @param withStat If {@code true}, enabled the serive metrics {@link ServiceConfiguration#setStatisticsEnabled(boolean)}. + */ + private void deployNodeSingleton(String svcName, boolean withStat) throws InterruptedException { + ServiceConfiguration svcCfg = new ServiceConfiguration(); + + svcCfg.setName(svcName); + svcCfg.setMaxPerNodeCount(1); + svcCfg.setTotalCount(nodeCount()); + svcCfg.setService(new MapServiceImpl<String, Integer>()); + svcCfg.setStatisticsEnabled(withStat); + + grid(0).services().deploy(svcCfg); + + awaitPartitionMapExchange(); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/inner/MyService.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/inner/MyService.java index 251b438..325f145 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/inner/MyService.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/inner/MyService.java @@ -32,6 +32,13 @@ public interface MyService extends Service { int hello(); /** + * @return Given {@code helloValue}. + */ + default int hello(int helloValue) { + return helloValue; + } + + /** * hashCode() method with a dummy argument. * * @param dummy Argument. diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java index 64622ad..a1fe470 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteServiceGridTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.service.GridServiceContinuousQueryR import org.apache.ignite.internal.processors.service.GridServiceDeployClusterReadOnlyModeTest; import org.apache.ignite.internal.processors.service.GridServiceDeploymentCompoundFutureSelfTest; import org.apache.ignite.internal.processors.service.GridServiceDeploymentExceptionPropagationTest; +import org.apache.ignite.internal.processors.service.GridServiceMetricsTest; import org.apache.ignite.internal.processors.service.GridServicePackagePrivateSelfTest; import org.apache.ignite.internal.processors.service.GridServiceProcessorBatchDeploySelfTest; import org.apache.ignite.internal.processors.service.GridServiceProcessorMultiNodeConfigSelfTest; @@ -121,6 +122,7 @@ import org.junit.runners.Suite; GridServiceDeployClusterReadOnlyModeTest.class, GridServiceClusterReadOnlyModeTest.class, IgniteServiceCallContextTest.class, + GridServiceMetricsTest.class }) public class IgniteServiceGridTestSuite { /** */