This is an automated email from the ASF dual-hosted git repository. yaohaishi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit 32cf126afb58107a0a07e99398e410b63cdb9bf2 Author: yhs0092 <[email protected]> AuthorDate: Wed Jan 15 10:07:45 2020 +0800 [SCB-1691] add WebsocketClientPool for multiple ServiceRegistryClient instance situation the legacy WebsocketUtils is preserved for compatibility and marked deprecated --- .../client/http/HttpClientPool.java | 3 - .../client/http/RestClientUtil.java | 6 +- .../client/http/WebsocketClientPool.java | 6 +- .../client/http/WebsocketClientUtil.java | 126 +++++++++++++++++++++ .../client/http/WebsocketUtils.java | 4 +- 5 files changed, 135 insertions(+), 10 deletions(-) diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/HttpClientPool.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/HttpClientPool.java index 46ca5ef..f4d28c8 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/HttpClientPool.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/HttpClientPool.java @@ -27,9 +27,6 @@ import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpVersion; import io.vertx.core.net.ProxyOptions; -/** - * Created by on 2017/4/28. - */ final class HttpClientPool extends AbstractClientPool { private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientPool.class); diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestClientUtil.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestClientUtil.java index 568ab57..6f0e8b0 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestClientUtil.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestClientUtil.java @@ -43,11 +43,11 @@ import io.vertx.core.http.HttpMethod; final class RestClientUtil { private static final Logger LOGGER = LoggerFactory.getLogger(RestClientUtil.class); - private static final String HEADER_CONTENT_TYPE = "Content-Type"; + static final String HEADER_CONTENT_TYPE = "Content-Type"; - private static final String HEADER_USER_AGENT = "User-Agent"; + static final String HEADER_USER_AGENT = "User-Agent"; - private static final String HEADER_TENANT_NAME = "x-domain-name"; + static final String HEADER_TENANT_NAME = "x-domain-name"; private List<AuthHeaderProvider> authHeaderProviders; diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketClientPool.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketClientPool.java index 802867a..38c17be 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketClientPool.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketClientPool.java @@ -25,13 +25,13 @@ import org.slf4j.LoggerFactory; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpVersion; -/** - * Created by on 2017/4/28. - */ public final class WebsocketClientPool extends AbstractClientPool { private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketClientPool.class); + /** + * The default instance, for default sc cluster. + */ public static final WebsocketClientPool INSTANCE = new WebsocketClientPool(); private WebsocketClientPool() { diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketClientUtil.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketClientUtil.java new file mode 100644 index 0000000..9262752 --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketClientUtil.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.client.http; + +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.servicecomb.foundation.auth.AuthHeaderProvider; +import org.apache.servicecomb.foundation.auth.SignRequest; +import org.apache.servicecomb.foundation.common.net.IpPort; +import org.apache.servicecomb.foundation.vertx.client.http.HttpClientWithContext; +import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.CaseInsensitiveHeaders; +import io.vertx.core.http.HttpMethod; +import io.vertx.core.http.WebSocketConnectOptions; + +public final class WebsocketClientUtil { + private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketClientUtil.class); + + private WebsocketClientPool websocketClientPool; + + private List<AuthHeaderProvider> authHeaderProviders; + + WebsocketClientUtil(ServiceRegistryConfig serviceRegistryConfig) { + websocketClientPool = new WebsocketClientPool(serviceRegistryConfig); + authHeaderProviders = serviceRegistryConfig.getAuthHeaderProviders(); + } + + public void open(IpPort ipPort, String url, Handler<Void> onOpen, Handler<Void> onClose, + Handler<Buffer> onMessage, Handler<Throwable> onException, + Handler<Throwable> onConnectFailed) { + HttpClientWithContext vertxHttpClient = websocketClientPool.getClient(); + vertxHttpClient.runOnContext(client -> { + WebSocketConnectOptions options = new WebSocketConnectOptions(); + options.setHost(ipPort.getHostOrIp()).setPort(ipPort.getPort()).setURI(url) + .setHeaders(getDefaultHeaders().addAll(getSignAuthHeaders( + createSignRequest(HttpMethod.GET.name(), ipPort, new RequestParam(), url, new HashMap<>())))); + client.webSocket(options, asyncResult -> { + if (asyncResult.failed()) { + onConnectFailed.handle(asyncResult.cause()); + } else { + onOpen.handle(null); + asyncResult.result().exceptionHandler(v -> { + onException.handle(v); + try { + asyncResult.result().close(); + } catch (Exception err) { + LOGGER.error("ws close error.", err); + } + }); + asyncResult.result().closeHandler(v -> { + onClose.handle(v); + }); + asyncResult.result().pongHandler(pong -> { + // ignore, just prevent NPE. + }); + asyncResult.result().frameHandler((frame) -> onMessage.handle(frame.binaryData())); + } + }); + }); + } + + public MultiMap getDefaultHeaders() { + return new CaseInsensitiveHeaders().addAll(defaultHeaders()); + } + + private Map<String, String> defaultHeaders() { + Map<String, String> headers = new HashMap<>(); + headers.put(RestClientUtil.HEADER_CONTENT_TYPE, "application/json"); + headers.put(RestClientUtil.HEADER_USER_AGENT, "cse-serviceregistry-client/1.0.0"); + headers.put(RestClientUtil.HEADER_TENANT_NAME, ServiceRegistryConfig.INSTANCE.getTenantName()); + + return headers; + } + + public Map<String, String> getSignAuthHeaders(SignRequest signReq) { + Map<String, String> headers = new HashMap<>(); + authHeaderProviders.forEach(provider -> headers.putAll(provider.getSignAuthHeaders(signReq))); + return headers; + } + + public SignRequest createSignRequest(String method, IpPort ipPort, RequestParam requestParam, String url, + Map<String, String> headers) { + SignRequest signReq = new SignRequest(); + StringBuilder endpoint = new StringBuilder("https://" + ipPort.getHostOrIp()); + endpoint.append(":" + ipPort.getPort()); + endpoint.append(url); + try { + signReq.setEndpoint(new URI(endpoint.toString())); + } catch (URISyntaxException e) { + LOGGER.error("set uri failed, uri is {}, message: {}", endpoint.toString(), e.getMessage()); + } + signReq.setContent((requestParam.getBody() != null && requestParam.getBody().length > 0) + ? new ByteArrayInputStream(requestParam.getBody()) + : null); + signReq.setHeaders(headers); + signReq.setHttpMethod(method); + signReq.setQueryParams(requestParam.getQueryParamsMap()); + return signReq; + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java index 1426de9..d7427ff 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/WebsocketUtils.java @@ -30,8 +30,10 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.http.WebSocketConnectOptions; /** - * Created by on 2017/4/28. + * This class is designed following singleton pattern, but it's not suitable for multi sc cluster occasion. + * @deprecated consider to use {@link WebsocketClientUtil} instead. */ +@Deprecated public final class WebsocketUtils { private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketUtils.class);
