[ https://issues.apache.org/jira/browse/SCB-849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16603794#comment-16603794 ]
ASF GitHub Bot commented on SCB-849: ------------------------------------ zhengyangyong closed pull request #876: [SCB-849] refactor producer connection limit using vertx metrics spi mechanism URL: https://github.com/apache/incubator-servicecomb-java-chassis/pull/876 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/servicecomb/core/Transport.java b/core/src/main/java/org/apache/servicecomb/core/Transport.java index 026ed7c4e..ed9ade43e 100644 --- a/core/src/main/java/org/apache/servicecomb/core/Transport.java +++ b/core/src/main/java/org/apache/servicecomb/core/Transport.java @@ -17,8 +17,6 @@ package org.apache.servicecomb.core; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.servicecomb.swagger.invocation.AsyncResponse; // TODO:感觉要拆成显式的client、server才好些 @@ -51,6 +49,4 @@ default boolean canInit() { Endpoint getPublishEndpoint() throws Exception; void send(Invocation invocation, AsyncResponse asyncResp) throws Exception; - - AtomicInteger getConnectedCounter(); } diff --git a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java index ff5370afd..c5bcedc7f 100644 --- a/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java +++ b/core/src/main/java/org/apache/servicecomb/core/transport/AbstractTransport.java @@ -23,7 +23,6 @@ import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.http.client.utils.URLEncodedUtils; @@ -42,6 +41,8 @@ import com.netflix.config.DynamicPropertyFactory; import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.metrics.MetricsOptions; public abstract class AbstractTransport implements Transport { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTransport.class); @@ -60,14 +61,13 @@ private static final long DEFAULT_TIMEOUT_MILLIS = 30000; // 所有transport使用同一个vertx实例,避免创建太多的线程 - protected Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", null); + protected Vertx transportVertx = VertxUtils.getOrCreateVertxByName("transport", + new VertxOptions().setMetricsOptions(new MetricsOptions().setEnabled(true))); protected Endpoint endpoint; protected Endpoint publishEndpoint; - private final AtomicInteger connectedCounter = new AtomicInteger(0); - @Override public Endpoint getPublishEndpoint() { return publishEndpoint; @@ -78,11 +78,6 @@ public Endpoint getEndpoint() { return endpoint; } - @Override - public AtomicInteger getConnectedCounter() { - return connectedCounter; - } - protected void setListenAddressWithoutSchema(String addressWithoutSchema) { setListenAddressWithoutSchema(addressWithoutSchema, null); } @@ -179,7 +174,7 @@ public static long getReqTimeout(String operationName, String schema, String mic /** * Handles the request timeout configurations. - * @param defaultValue + * @param defaultValue default value if missing * @param keys list of keys * @return configured value */ diff --git a/core/src/test/java/org/apache/servicecomb/core/TestTransport.java b/core/src/test/java/org/apache/servicecomb/core/TestTransport.java index abfa20252..d6a118308 100644 --- a/core/src/test/java/org/apache/servicecomb/core/TestTransport.java +++ b/core/src/test/java/org/apache/servicecomb/core/TestTransport.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.servicecomb.core.endpoint.EndpointsCache; import org.apache.servicecomb.core.transport.TransportManager; @@ -47,11 +46,6 @@ public void testEndpoint() throws Exception { public void send(Invocation invocation, AsyncResponse asyncResp) { } - @Override - public AtomicInteger getConnectedCounter() { - return new AtomicInteger(0); - } - @Override public Object parseAddress(String address) { return "127.0.0.1"; diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ConnectionEvent.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ConnectionEventType.java similarity index 90% rename from foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ConnectionEvent.java rename to foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ConnectionEventType.java index 359f92853..7614dbaeb 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ConnectionEvent.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ConnectionEventType.java @@ -17,7 +17,9 @@ package org.apache.servicecomb.foundation.vertx; -public enum ConnectionEvent { - Connected, - Closed +public enum ConnectionEventType { + HTTPConnected, + HTTPClosed, + TCPConnected, + TCPClosed } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientEvent.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ServerEvent.java similarity index 68% rename from foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientEvent.java rename to foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ServerEvent.java index 0b37bbfcc..51f6322e4 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ClientEvent.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/ServerEvent.java @@ -17,39 +17,33 @@ package org.apache.servicecomb.foundation.vertx; +import io.vertx.core.net.SocketAddress; + /** * Notice: this event will raised in vertx eventloop thread, so do not run any block code */ -public class ClientEvent { - private final String address; - - private final ConnectionEvent connectionEvent; +public class ServerEvent { + private final SocketAddress address; - private final TransportType transportType; + private final ConnectionEventType connectionEventType; private final int totalConnectedCount; - public String getAddress() { + public SocketAddress getAddress() { return address; } - public ConnectionEvent getConnectionEvent() { - return connectionEvent; - } - - public TransportType getTransportType() { - return transportType; + public ConnectionEventType getConnectionEventType() { + return connectionEventType; } public int getTotalConnectedCount() { return totalConnectedCount; } - public ClientEvent(String address, ConnectionEvent connectionEvent, TransportType transportType, - int totalConnectedCount) { + public ServerEvent(SocketAddress address, ConnectionEventType connectionEventType, int totalConnectedCount) { this.address = address; - this.connectionEvent = connectionEvent; - this.transportType = transportType; + this.connectionEventType = connectionEventType; this.totalConnectedCount = totalConnectedCount; } } \ No newline at end of file diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBHttpServerMetrics.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBHttpServerMetrics.java new file mode 100644 index 000000000..ade76f160 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBHttpServerMetrics.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.foundation.vertx.metrics; + +import org.apache.servicecomb.foundation.common.event.EventManager; +import org.apache.servicecomb.foundation.vertx.ServerEvent; +import org.apache.servicecomb.foundation.vertx.ConnectionEventType; + +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.http.ServerWebSocket; +import io.vertx.core.net.SocketAddress; +import io.vertx.core.spi.metrics.HttpServerMetrics; + +public class SCBHttpServerMetrics implements HttpServerMetrics<Void, Void, SCBSocketMetrics> { + private final SCBSocketMetrics metrics; + + public SCBHttpServerMetrics() { + this.metrics = new SCBSocketMetrics(); + } + + @Override + public Void requestBegin(SCBSocketMetrics socketMetric, HttpServerRequest request) { + return null; + } + + @Override + public void requestReset(Void requestMetric) { + + } + + @Override + public Void responsePushed(SCBSocketMetrics socketMetric, HttpMethod method, String uri, + HttpServerResponse response) { + return null; + } + + @Override + public void responseEnd(Void requestMetric, HttpServerResponse response) { + + } + + @Override + public Void upgrade(Void requestMetric, ServerWebSocket serverWebSocket) { + return null; + } + + @Override + public Void connected(SCBSocketMetrics socketMetric, ServerWebSocket serverWebSocket) { + return null; + } + + @Override + public void disconnected(Void serverWebSocketMetric) { + + } + + @Override + public SCBSocketMetrics connected(SocketAddress remoteAddress, String remoteName) { + int connectedCount = metrics.getCounter().incrementAndGet(); + EventManager.post(new ServerEvent(remoteAddress, ConnectionEventType.HTTPConnected, connectedCount)); + return metrics; + } + + @Override + public void disconnected(SCBSocketMetrics socketMetric, SocketAddress remoteAddress) { + int connectedCount = socketMetric.getCounter().decrementAndGet(); + EventManager.post(new ServerEvent(remoteAddress, ConnectionEventType.HTTPClosed, connectedCount)); + } + + @Override + public void bytesRead(SCBSocketMetrics socketMetric, SocketAddress remoteAddress, long numberOfBytes) { + + } + + @Override + public void bytesWritten(SCBSocketMetrics socketMetric, SocketAddress remoteAddress, long numberOfBytes) { + + } + + @Override + public void exceptionOccurred(SCBSocketMetrics socketMetric, SocketAddress remoteAddress, Throwable t) { + + } + + @Override + @Deprecated + public boolean isEnabled() { + return true; + } + + @Override + public void close() { + + } +} diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/TransportType.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBSocketMetrics.java similarity index 71% rename from foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/TransportType.java rename to foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBSocketMetrics.java index 0d8489dc0..747ad1b8e 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/TransportType.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBSocketMetrics.java @@ -15,9 +15,18 @@ * limitations under the License. */ -package org.apache.servicecomb.foundation.vertx; +package org.apache.servicecomb.foundation.vertx.metrics; -public enum TransportType { - Highway, - Rest +import java.util.concurrent.atomic.AtomicInteger; + +public class SCBSocketMetrics { + private final AtomicInteger counter; + + public AtomicInteger getCounter() { + return counter; + } + + public SCBSocketMetrics() { + this.counter = new AtomicInteger(0); + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBTCPMetrics.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBTCPMetrics.java new file mode 100644 index 000000000..ac328e9df --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBTCPMetrics.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.foundation.vertx.metrics; + +import org.apache.servicecomb.foundation.common.event.EventManager; +import org.apache.servicecomb.foundation.vertx.ServerEvent; +import org.apache.servicecomb.foundation.vertx.ConnectionEventType; + +import io.vertx.core.net.SocketAddress; +import io.vertx.core.spi.metrics.TCPMetrics; + +public class SCBTCPMetrics implements TCPMetrics<SCBSocketMetrics> { + + private final SCBSocketMetrics metrics; + + public SCBTCPMetrics() { + this.metrics = new SCBSocketMetrics(); + } + + public SCBSocketMetrics connected(SocketAddress remoteAddress, String remoteName) { + int connectedCount = metrics.getCounter().incrementAndGet(); + EventManager.post(new ServerEvent(remoteAddress, ConnectionEventType.TCPConnected, connectedCount)); + return metrics; + } + + @Override + public void disconnected(SCBSocketMetrics socketMetric, SocketAddress remoteAddress) { + int connectedCount = socketMetric.getCounter().decrementAndGet(); + EventManager.post(new ServerEvent(remoteAddress, ConnectionEventType.TCPClosed, connectedCount)); + } + + @Override + public void bytesRead(SCBSocketMetrics socketMetric, SocketAddress remoteAddress, long numberOfBytes) { + + } + + @Override + public void bytesWritten(SCBSocketMetrics socketMetric, SocketAddress remoteAddress, long numberOfBytes) { + + } + + @Override + public void exceptionOccurred(SCBSocketMetrics socketMetric, SocketAddress remoteAddress, Throwable t) { + + } + + @Override + @Deprecated + public boolean isEnabled() { + return true; + } + + @Override + public void close() { + + } +} diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBVertxMetrics.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBVertxMetrics.java new file mode 100644 index 000000000..a7512e3cc --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBVertxMetrics.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.foundation.vertx.metrics; + +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.metrics.impl.DummyVertxMetrics; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.SocketAddress; +import io.vertx.core.spi.metrics.HttpServerMetrics; +import io.vertx.core.spi.metrics.TCPMetrics; + +public class SCBVertxMetrics extends DummyVertxMetrics { + + @Override + public HttpServerMetrics createMetrics(HttpServer server, SocketAddress localAddress, HttpServerOptions options) { + return new SCBHttpServerMetrics(); + } + + @Override + public TCPMetrics createMetrics(SocketAddress localAddress, NetServerOptions options) { + return new SCBTCPMetrics(); + } + + @Override + public boolean isEnabled() { + return true; + } +} diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBVertxMetricsFactory.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBVertxMetricsFactory.java new file mode 100644 index 000000000..b14bd22f9 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/SCBVertxMetricsFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.foundation.vertx.metrics; + +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.spi.VertxMetricsFactory; +import io.vertx.core.spi.metrics.VertxMetrics; + +public class SCBVertxMetricsFactory implements VertxMetricsFactory { + @Override + public VertxMetrics metrics(Vertx vertx, VertxOptions vertxOptions) { + return new SCBVertxMetrics(); + } +} diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java index 7fc3f24f4..b7a824c49 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServer.java @@ -18,33 +18,27 @@ package org.apache.servicecomb.foundation.vertx.server; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.servicecomb.foundation.common.event.EventManager; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; import org.apache.servicecomb.foundation.ssl.SSLCustom; import org.apache.servicecomb.foundation.ssl.SSLOption; import org.apache.servicecomb.foundation.ssl.SSLOptionFactory; import org.apache.servicecomb.foundation.vertx.AsyncResultCallback; -import org.apache.servicecomb.foundation.vertx.ClientEvent; -import org.apache.servicecomb.foundation.vertx.ConnectionEvent; -import org.apache.servicecomb.foundation.vertx.TransportType; import org.apache.servicecomb.foundation.vertx.VertxTLSBuilder; +import org.apache.servicecomb.foundation.vertx.metrics.SCBSocketMetrics; import com.netflix.config.DynamicPropertyFactory; import io.vertx.core.Vertx; import io.vertx.core.net.NetServer; import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.impl.NetSocketImpl; public class TcpServer { private URIEndpointObject endpointObject; - private final AtomicInteger connectedCounter; - - public TcpServer(URIEndpointObject endpointObject, AtomicInteger connectedCounter) { + public TcpServer(URIEndpointObject endpointObject) { this.endpointObject = endpointObject; - this.connectedCounter = connectedCounter; } public void init(Vertx vertx, String sslKey, AsyncResultCallback<InetSocketAddress> callback) { @@ -67,19 +61,16 @@ public void init(Vertx vertx, String sslKey, AsyncResultCallback<InetSocketAddre } netServer.connectHandler(netSocket -> { - int connectedCount = connectedCounter.incrementAndGet(); + int connectedCount = ((SCBSocketMetrics) ((NetSocketImpl) netSocket).metric()).getCounter().get(); int connectionLimit = DynamicPropertyFactory.getInstance() .getIntProperty("servicecomb.highway.server.connection-limit", Integer.MAX_VALUE).get(); if (connectedCount > connectionLimit) { - connectedCounter.decrementAndGet(); netSocket.close(); return; } TcpServerConnection connection = createTcpServerConnection(); - connection.init(netSocket, connectedCounter); - EventManager.post(new ClientEvent(netSocket.remoteAddress().toString(), - ConnectionEvent.Connected, TransportType.Highway, connectedCount)); + connection.init(netSocket); }); InetSocketAddress socketAddress = endpointObject.getSocketAddress(); diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java index 18120a477..348a3f843 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/server/TcpServerConnection.java @@ -16,12 +16,6 @@ */ package org.apache.servicecomb.foundation.vertx.server; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.servicecomb.foundation.common.event.EventManager; -import org.apache.servicecomb.foundation.vertx.ClientEvent; -import org.apache.servicecomb.foundation.vertx.ConnectionEvent; -import org.apache.servicecomb.foundation.vertx.TransportType; import org.apache.servicecomb.foundation.vertx.tcp.TcpConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +28,7 @@ protected TcpParser splitter; - public void init(NetSocket netSocket, AtomicInteger connectedCounter) { + public void init(NetSocket netSocket) { // currently, socket always be NetSocketImpl this.initNetSocket((NetSocketImpl) netSocket); @@ -52,9 +46,6 @@ public void init(NetSocket netSocket, AtomicInteger connectedCounter) { LOGGER.error("disconected from {}, in thread {}", remoteAddress, Thread.currentThread().getName()); - - int connectedCount = connectedCounter.decrementAndGet(); - EventManager.post(new ClientEvent(remoteAddress, ConnectionEvent.Closed, TransportType.Highway, connectedCount)); }); netSocket.handler(splitter); diff --git a/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.VertxMetricsFactory b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.VertxMetricsFactory new file mode 100644 index 000000000..31bab6971 --- /dev/null +++ b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.VertxMetricsFactory @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.servicecomb.foundation.vertx.metrics.SCBVertxMetricsFactory \ No newline at end of file diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java index 59825f5c9..403b73105 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServer.java @@ -18,7 +18,6 @@ package org.apache.servicecomb.foundation.vertx.server; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; import org.apache.servicecomb.foundation.vertx.AsyncResultCallback; @@ -35,15 +34,15 @@ public class TestTcpServer { static class TcpServerForTest extends TcpServer { public TcpServerForTest(URIEndpointObject endpointObject) { - super(endpointObject, new AtomicInteger()); + super(endpointObject); } @Override protected TcpServerConnection createTcpServerConnection() { return new TcpServerConnection() { @Override - public void init(NetSocket netSocket, AtomicInteger connectedCounter) { - super.init(netSocket, connectedCounter); + public void init(NetSocket netSocket) { + super.init(netSocket); } }; } diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java index 08b39780b..3047042f2 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/server/TestTcpServerConnection.java @@ -16,8 +16,6 @@ */ package org.apache.servicecomb.foundation.vertx.server; -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Assert; import org.junit.Test; @@ -31,7 +29,7 @@ public void test(@Mocked NetSocketImpl netSocket) { connection.setProtocol("p"); connection.setZipName("z"); - connection.init(netSocket, new AtomicInteger()); + connection.init(netSocket); Assert.assertEquals(netSocket, connection.getNetSocket()); } diff --git a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/ConnectionEventWatcher.java b/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/ConnectionEventWatcher.java index bda03c01c..2ce0b0078 100644 --- a/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/ConnectionEventWatcher.java +++ b/integration-tests/spring-pojo-tests/src/test/java/org/apache/servicecomb/demo/pojo/test/ConnectionEventWatcher.java @@ -20,8 +20,8 @@ import java.util.ArrayList; import java.util.List; -import org.apache.servicecomb.foundation.vertx.ClientEvent; -import org.apache.servicecomb.foundation.vertx.TransportType; +import org.apache.servicecomb.foundation.vertx.ServerEvent; +import org.apache.servicecomb.foundation.vertx.ConnectionEventType; import com.google.common.eventbus.Subscribe; @@ -33,8 +33,9 @@ } @Subscribe - public void getEvent(ClientEvent event) { - if (TransportType.Highway.equals(event.getTransportType())) { + public void getEvent(ServerEvent event) { + if (ConnectionEventType.TCPConnected.equals(event.getConnectionEventType()) || + ConnectionEventType.TCPClosed.equals(event.getConnectionEventType())) { counters.add(event.getTotalConnectedCount()); } } diff --git a/integration-tests/springmvc-tests/springmvc-tests-common/src/test/java/org/apache/servicecomb/demo/springmvc/tests/ConnectionEventWatcher.java b/integration-tests/springmvc-tests/springmvc-tests-common/src/test/java/org/apache/servicecomb/demo/springmvc/tests/ConnectionEventWatcher.java index 2ad819c17..4ce5902e0 100644 --- a/integration-tests/springmvc-tests/springmvc-tests-common/src/test/java/org/apache/servicecomb/demo/springmvc/tests/ConnectionEventWatcher.java +++ b/integration-tests/springmvc-tests/springmvc-tests-common/src/test/java/org/apache/servicecomb/demo/springmvc/tests/ConnectionEventWatcher.java @@ -20,8 +20,8 @@ import java.util.ArrayList; import java.util.List; -import org.apache.servicecomb.foundation.vertx.ClientEvent; -import org.apache.servicecomb.foundation.vertx.TransportType; +import org.apache.servicecomb.foundation.vertx.ServerEvent; +import org.apache.servicecomb.foundation.vertx.ConnectionEventType; import com.google.common.eventbus.Subscribe; @@ -33,8 +33,9 @@ } @Subscribe - public void getEvent(ClientEvent event) { - if (TransportType.Rest.equals(event.getTransportType())) { + public void getEvent(ServerEvent event) { + if (ConnectionEventType.HTTPConnected.equals(event.getConnectionEventType()) || + ConnectionEventType.HTTPClosed.equals(event.getConnectionEventType())) { counters.add(event.getTotalConnectedCount()); } } diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java index 0694c2de3..5aca854b7 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServer.java @@ -17,8 +17,6 @@ package org.apache.servicecomb.transport.highway; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; import org.apache.servicecomb.foundation.vertx.server.TcpServer; @@ -27,8 +25,8 @@ public class HighwayServer extends TcpServer { private Endpoint endpoint; - public HighwayServer(Endpoint endpoint, AtomicInteger connectedCounter) { - super((URIEndpointObject) endpoint.getAddress(), connectedCounter); + public HighwayServer(Endpoint endpoint) { + super((URIEndpointObject) endpoint.getAddress()); this.endpoint = endpoint; } diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java index 1f5a7593b..968cf0cc8 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerConnection.java @@ -16,8 +16,6 @@ */ package org.apache.servicecomb.transport.highway; -import java.util.concurrent.atomic.AtomicInteger; - import javax.ws.rs.core.Response.Status; import org.apache.servicecomb.core.Endpoint; @@ -47,9 +45,9 @@ public HighwayServerConnection(Endpoint endpoint) { } @Override - public void init(NetSocket netSocket, AtomicInteger connectedCounter) { + public void init(NetSocket netSocket) { splitter = new TcpParser(this); - super.init(netSocket, connectedCounter); + super.init(netSocket); } @Override @@ -73,7 +71,7 @@ public void handle(long msgId, Buffer headerBuffer, Buffer bodyBuffer) { } protected RequestHeader decodeRequestHeader(long msgId, Buffer headerBuffer) { - RequestHeader requestHeader = null; + RequestHeader requestHeader; try { requestHeader = HighwayCodec.readRequestHeader(headerBuffer, protobufFeature); } catch (Exception e) { @@ -89,7 +87,7 @@ protected RequestHeader decodeRequestHeader(long msgId, Buffer headerBuffer) { } protected void onLogin(long msgId, RequestHeader header, Buffer bodyBuffer) { - LoginRequest request = null; + LoginRequest request; try { request = LoginRequest.readObject(bodyBuffer); } catch (Exception e) { diff --git a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java index 9a17081d5..3d6efc4cf 100644 --- a/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java +++ b/transports/transport-highway/src/main/java/org/apache/servicecomb/transport/highway/HighwayServerVerticle.java @@ -18,10 +18,8 @@ package org.apache.servicecomb.transport.highway; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.servicecomb.core.Const; -import org.apache.servicecomb.core.CseContext; import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.core.transport.AbstractTransport; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; @@ -42,16 +40,6 @@ private URIEndpointObject endpointObject; - private final AtomicInteger connectedCounter; - - public HighwayServerVerticle() { - this(CseContext.getInstance().getTransportManager().findTransport(Const.HIGHWAY).getConnectedCounter()); - } - - public HighwayServerVerticle(AtomicInteger connectedCounter) { - this.connectedCounter = connectedCounter; - } - @Override public void init(Vertx vertx, Context context) { super.init(vertx, context); @@ -79,7 +67,7 @@ protected void startListen(Future<Void> startFuture) { return; } - HighwayServer server = new HighwayServer(endpoint, connectedCounter); + HighwayServer server = new HighwayServer(endpoint); server.init(vertx, SSL_KEY, ar -> { if (ar.succeeded()) { InetSocketAddress socketAddress = ar.result(); diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java index 9a18f9a7f..faef59e48 100644 --- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java +++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayServerConnection.java @@ -17,7 +17,6 @@ package org.apache.servicecomb.transport.highway; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicInteger; import javax.xml.ws.Holder; @@ -73,7 +72,7 @@ public void init() { } }; connection = new HighwayServerConnection(endpoint); - connection.init(netSocket, new AtomicInteger()); + connection.init(netSocket); header = new RequestHeader(); } @@ -100,8 +99,8 @@ public void testReqeustHeaderError() throws Exception { connection.handle(0, headerBuffer, null); - Assert.assertEquals(null, connection.getProtocol()); - Assert.assertEquals(null, connection.getZipName()); + Assert.assertNull(connection.getProtocol()); + Assert.assertNull(connection.getZipName()); } @Test @@ -133,8 +132,8 @@ public void testSetParameterError() throws Exception { connection.handle(0, headerBuffer, bodyBuffer); - Assert.assertEquals(null, connection.getProtocol()); - Assert.assertEquals(null, connection.getZipName()); + Assert.assertNull(connection.getProtocol()); + Assert.assertNull(connection.getZipName()); } @Test @@ -180,8 +179,8 @@ public void execute() { connection.handle(0, headerBuffer, bodyBuffer); - Assert.assertEquals(null, connection.getProtocol()); - Assert.assertEquals(null, connection.getZipName()); + Assert.assertNull(connection.getProtocol()); + Assert.assertNull(connection.getZipName()); Assert.assertEquals(true, holder.value); } @@ -203,8 +202,8 @@ public boolean init(NetSocket netSocket, long msgId, connection.handle(0, headerBuffer, bodyBuffer); - Assert.assertEquals(null, connection.getProtocol()); - Assert.assertEquals(null, connection.getZipName()); + Assert.assertNull(connection.getProtocol()); + Assert.assertNull(connection.getZipName()); Assert.assertEquals(false, holder.value); } diff --git a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java index 95019d8cc..f2ac8d6a1 100644 --- a/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java +++ b/transports/transport-highway/src/test/java/org/apache/servicecomb/transport/highway/TestHighwayVerticle.java @@ -19,8 +19,6 @@ import static org.junit.Assert.assertTrue; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.core.Transport; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; @@ -40,7 +38,7 @@ @Test public void testHighwayVerticle(@Mocked Transport transport, @Mocked Vertx vertx, @Mocked Context context, @Mocked JsonObject json) { - HighwayServerVerticle highwayVerticle = new HighwayServerVerticle(new AtomicInteger()); + HighwayServerVerticle highwayVerticle = new HighwayServerVerticle(); URIEndpointObject endpiontObject = new URIEndpointObject("highway://127.0.0.1:9090"); new Expectations() { { diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java index 53c3890ee..d5600005d 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java @@ -19,22 +19,16 @@ import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.servicecomb.core.Const; -import org.apache.servicecomb.core.CseContext; import org.apache.servicecomb.core.Endpoint; import org.apache.servicecomb.core.transport.AbstractTransport; -import org.apache.servicecomb.foundation.common.event.EventManager; import org.apache.servicecomb.foundation.common.net.URIEndpointObject; import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils; import org.apache.servicecomb.foundation.ssl.SSLCustom; import org.apache.servicecomb.foundation.ssl.SSLOption; import org.apache.servicecomb.foundation.ssl.SSLOptionFactory; -import org.apache.servicecomb.foundation.vertx.ClientEvent; -import org.apache.servicecomb.foundation.vertx.ConnectionEvent; -import org.apache.servicecomb.foundation.vertx.TransportType; import org.apache.servicecomb.foundation.vertx.VertxTLSBuilder; +import org.apache.servicecomb.foundation.vertx.metrics.SCBSocketMetrics; import org.apache.servicecomb.transport.rest.vertx.accesslog.AccessLogConfiguration; import org.apache.servicecomb.transport.rest.vertx.accesslog.impl.AccessLogHandler; import org.slf4j.Logger; @@ -49,6 +43,7 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.impl.ServerConnection; import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.CorsHandler; @@ -61,16 +56,6 @@ private URIEndpointObject endpointObject; - private final AtomicInteger connectedCounter; - - public RestServerVerticle() { - this(CseContext.getInstance().getTransportManager().findTransport(Const.RESTFUL).getConnectedCounter()); - } - - public RestServerVerticle(AtomicInteger connectedCounter) { - this.connectedCounter = connectedCounter; - } - @Override public void init(Vertx vertx, Context context) { super.init(vertx, context); @@ -95,17 +80,11 @@ public void start(Future<Void> startFuture) throws Exception { HttpServer httpServer = createHttpServer(); httpServer.requestHandler(mainRouter::accept); httpServer.connectionHandler(connection -> { - int connectedCount = connectedCounter.incrementAndGet(); int connectionLimit = DynamicPropertyFactory.getInstance() .getIntProperty("servicecomb.rest.server.connection-limit", Integer.MAX_VALUE).get(); + int connectedCount = ((SCBSocketMetrics) ((ServerConnection) connection).metric()).getCounter().get(); if (connectedCount > connectionLimit) { - connectedCounter.decrementAndGet(); connection.close(); - } else { - EventManager.post(new ClientEvent(connection.remoteAddress().toString(), - ConnectionEvent.Connected, TransportType.Rest, connectedCount)); - connection.closeHandler(event -> EventManager.post(new ClientEvent(connection.remoteAddress().toString(), - ConnectionEvent.Closed, TransportType.Rest, connectedCounter.decrementAndGet()))); } }); diff --git a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java index fb7a13cfe..4b6543564 100644 --- a/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java +++ b/transports/transport-rest/transport-rest-vertx/src/test/java/org/apache/servicecomb/transport/rest/vertx/TestRestServerVerticle.java @@ -57,7 +57,7 @@ @Before public void setUp() { - instance = new RestServerVerticle(new AtomicInteger()); + instance = new RestServerVerticle(); startFuture = Future.future(); CseContext.getInstance().setTransportManager(new TransportManager()); @@ -90,7 +90,7 @@ public void testRestServerVerticleWithRouter(@Mocked Transport transport, @Mocke result = endpiont; } }; - RestServerVerticle server = new RestServerVerticle(new AtomicInteger()); + RestServerVerticle server = new RestServerVerticle(); // process stuff done by Expectations server.init(vertx, context); server.start(startFuture); @@ -117,7 +117,7 @@ public void testRestServerVerticleWithRouterSSL(@Mocked Transport transport, @Mo result = endpiont; } }; - RestServerVerticle server = new RestServerVerticle(new AtomicInteger()); + RestServerVerticle server = new RestServerVerticle(); // process stuff done by Expectations server.init(vertx, context); server.start(startFuture); @@ -144,7 +144,7 @@ public void testRestServerVerticleWithHttp2(@Mocked Transport transport, @Mocked result = endpiont; } }; - RestServerVerticle server = new RestServerVerticle(new AtomicInteger()); + RestServerVerticle server = new RestServerVerticle(); boolean status = false; try { server.init(vertx, context); @@ -240,7 +240,7 @@ CorsHandler getCorsHandler(String corsAllowedOrigin) { Router router = Mockito.mock(Router.class); Mockito.when(router.route()).thenReturn(Mockito.mock(Route.class)); - RestServerVerticle server = new RestServerVerticle(new AtomicInteger()); + RestServerVerticle server = new RestServerVerticle(); Deencapsulation.invoke(server, "mountCorsHandler", router); Assert.assertEquals(7, counter.get()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > refactor producer connection limit using vertx metrics spi mechanism > -------------------------------------------------------------------- > > Key: SCB-849 > URL: https://issues.apache.org/jira/browse/SCB-849 > Project: Apache ServiceComb > Issue Type: Bug > Components: Java-Chassis > Affects Versions: java-chassis-1.1.0 > Reporter: yangyongzheng > Assignee: yangyongzheng > Priority: Major > Fix For: java-chassis-1.1.0 > > > we can use vertx metrics spi mechanism to simplify connection limit feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)