[ https://issues.apache.org/jira/browse/SPARK-25642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726999#comment-16726999 ]
ASF GitHub Bot commented on SPARK-25642: ---------------------------------------- asfgit closed pull request #22498: [SPARK-25642] : Adding two new metrics to record the number of registered connections as well as the number of active connections to YARN Shuffle Service URL: https://github.com/apache/spark/pull/22498 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 480b52652de53..1a3f3f2a6f249 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; +import com.codahale.metrics.Counter; import io.netty.channel.Channel; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; @@ -66,6 +67,8 @@ private final RpcHandler rpcHandler; private final boolean closeIdleConnections; private final boolean isClientOnly; + // Number of registered connections to the shuffle service + private Counter registeredConnections = new Counter(); /** * Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created @@ -221,7 +224,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler, conf.maxChunksBeingTransferred()); return new TransportChannelHandler(client, responseHandler, requestHandler, - conf.connectionTimeoutMs(), closeIdleConnections); + conf.connectionTimeoutMs(), closeIdleConnections, this); } /** @@ -234,4 +237,8 @@ private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler } public TransportConf getConf() { return conf; } + + public Counter getRegisteredConnections() { + return registeredConnections; + } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index c824a7b0d4740..ca81099c4d5cb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import org.apache.spark.network.TransportContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,18 +58,21 @@ private final TransportRequestHandler requestHandler; private final long requestTimeoutNs; private final boolean closeIdleConnections; + private final TransportContext transportContext; public TransportChannelHandler( TransportClient client, TransportResponseHandler responseHandler, TransportRequestHandler requestHandler, long requestTimeoutMs, - boolean closeIdleConnections) { + boolean closeIdleConnections, + TransportContext transportContext) { this.client = client; this.responseHandler = responseHandler; this.requestHandler = requestHandler; this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000; this.closeIdleConnections = closeIdleConnections; + this.transportContext = transportContext; } public TransportClient getClient() { @@ -176,4 +180,16 @@ public TransportResponseHandler getResponseHandler() { return responseHandler; } + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + transportContext.getRegisteredConnections().inc(); + super.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + transportContext.getRegisteredConnections().dec(); + super.channelUnregistered(ctx); + } + } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index 9c85ab2f5f06f..eb5f10a5c1a1e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Counter; import com.codahale.metrics.MetricSet; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -159,4 +160,8 @@ public void close() { } bootstrap = null; } + + public Counter getRegisteredConnections() { + return context.getRegisteredConnections(); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 098fa7974b87b..788a845c57755 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -29,6 +29,7 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; import com.codahale.metrics.Timer; +import com.codahale.metrics.Counter; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,7 +174,8 @@ private void checkAuth(TransportClient client, String appId) { /** * A simple class to wrap all shuffle service wrapper metrics */ - private class ShuffleMetrics implements MetricSet { + @VisibleForTesting + public class ShuffleMetrics implements MetricSet { private final Map<String, Metric> allMetrics; // Time latency for open block request in ms private final Timer openBlockRequestLatencyMillis = new Timer(); @@ -181,14 +183,20 @@ private void checkAuth(TransportClient client, String appId) { private final Timer registerExecutorRequestLatencyMillis = new Timer(); // Block transfer rate in byte per second private final Meter blockTransferRateBytes = new Meter(); + // Number of active connections to the shuffle service + private Counter activeConnections = new Counter(); + // Number of registered connections to the shuffle service + private Counter registeredConnections = new Counter(); - private ShuffleMetrics() { + public ShuffleMetrics() { allMetrics = new HashMap<>(); allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); allMetrics.put("registeredExecutorsSize", (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize()); + allMetrics.put("numActiveConnections", activeConnections); + allMetrics.put("numRegisteredConnections", registeredConnections); } @Override @@ -244,4 +252,16 @@ public ManagedBuffer next() { } } + @Override + public void channelActive(TransportClient client) { + metrics.activeConnections.inc(); + super.channelActive(client); + } + + @Override + public void channelInactive(TransportClient client) { + metrics.activeConnections.dec(); + super.channelInactive(client); + } + } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 72ae1a1295236..55c028c224a4b 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.ByteBuffer; import java.util.List; @@ -170,15 +171,6 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); - // register metrics on the block handler into the Node Manager's metrics system. - YarnShuffleServiceMetrics serviceMetrics = - new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); - - MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); - metricsSystem.register( - "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); - logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); - // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List<TransportServerBootstrap> bootstraps = Lists.newArrayList(); @@ -199,6 +191,18 @@ protected void serviceInit(Configuration conf) throws Exception { port = shuffleServer.getPort(); boundPort = port; String authEnabledString = authEnabled ? "enabled" : "not enabled"; + + // register metrics on the block handler into the Node Manager's metrics system. + blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections", + shuffleServer.getRegisteredConnections()); + YarnShuffleServiceMetrics serviceMetrics = + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + metricsSystem.register( + "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + logger.info("Started YARN shuffle service for Spark on port {}. " + "Authentication is {}. Registered executor file is {}", port, authEnabledString, registeredExecutorFile); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 3e4d479b862b3..501237407e9b2 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -107,6 +107,11 @@ public static void collectMetric( throw new IllegalStateException( "Not supported class type of metric[" + name + "] for value " + gaugeValue); } + } else if (metric instanceof Counter) { + Counter c = (Counter) metric; + long counterValue = c.getCount(); + metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " + + "connections to shuffle service " + name), counterValue); } } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index f6b3c37f0fe72..03e3abb3ce569 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -84,6 +84,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana server = transportContext.createServer(port, bootstraps.asJava) shuffleServiceSource.registerMetricSet(server.getAllMetrics) + blockHandler.getAllMetrics.getMetrics.put("numRegisteredConnections", + server.getRegisteredConnections) shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics) masterMetricsSystem.registerSource(shuffleServiceSource) masterMetricsSystem.start() diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index 40b92282a3b8f..952fd0b70bb7b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -38,7 +38,8 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { test("metrics named as expected") { val allMetrics = Set( "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", - "blockTransferRateBytes", "registeredExecutorsSize") + "blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections", + "numRegisteredConnections") metrics.getMetrics.keySet().asScala should be (allMetrics) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add new Metrics in External Shuffle Service to help determine Network > performance and Connection Handling capabilities of the Shuffle Service > --------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-25642 > URL: https://issues.apache.org/jira/browse/SPARK-25642 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core > Affects Versions: 2.4.0 > Reporter: Parth Gandhi > Assignee: Parth Gandhi > Priority: Minor > Fix For: 3.0.0 > > > Recently, the ability to expose the metrics for YARN Shuffle Service was > added as part of [SPARK-18364|[https://github.com/apache/spark/pull/22485]]. > We need to add some metrics to be able to determine the number of active > connections as well as open connections to the external shuffle service to > benchmark network and connection issues on large cluster environments. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org