This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 1e92a79 Adapt DNS to the latest version (#176)
1e92a79 is described below
commit 1e92a7981caafdac8c265fbd78c5542db9e1a5f1
Author: Albumen Kevin <[email protected]>
AuthorDate: Mon Nov 21 16:17:02 2022 +0800
Adapt DNS to the latest version (#176)
---
.../consul/ConsulDynamicConfiguration.java | 2 +-
dubbo-extensions-dependencies-bom/pom.xml | 2 +-
.../dubbo-registry-dns/pom.xml | 1 -
.../dubbo/registry/dns/DNSServiceDiscovery.java | 1 +
.../dns/ReflectionBasedServiceDiscovery.java | 285 ---------------------
.../protocol/hessian/HessianProtocolFilter.java | 2 +-
6 files changed, 4 insertions(+), 289 deletions(-)
diff --git
a/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java
index b82aa8a..eb572a7 100644
---
a/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java
+++
b/dubbo-configcenter-extensions/dubbo-configcenter-consul/src/main/java/org/apache/dubbo/configcenter/consul/ConsulDynamicConfiguration.java
@@ -111,7 +111,7 @@ public class ConsulDynamicConfiguration extends
TreePathDynamicConfiguration {
}
@Override
- protected void doAddListener(String pathKey, ConfigurationListener
listener) {
+ protected void doAddListener(String pathKey, ConfigurationListener
listener, String key, String group) {
logger.info("register listener " + listener.getClass() + " for config
with key: " + pathKey);
ConsulListener watcher = watchers.computeIfAbsent(pathKey, k -> new
ConsulListener(pathKey));
watcher.addListener(listener);
diff --git a/dubbo-extensions-dependencies-bom/pom.xml
b/dubbo-extensions-dependencies-bom/pom.xml
index fbf3683..da6320c 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -90,7 +90,7 @@
<properties>
<revision>1.0.3-SNAPSHOT</revision>
- <dubbo.version>3.0.8</dubbo.version>
+ <dubbo.version>3.1.2</dubbo.version>
<spring.version>5.2.9.RELEASE</spring.version>
<spring-boot.version>2.4.1</spring-boot.version>
diff --git a/dubbo-registry-extensions/dubbo-registry-dns/pom.xml
b/dubbo-registry-extensions/dubbo-registry-dns/pom.xml
index ad282f6..4219046 100644
--- a/dubbo-registry-extensions/dubbo-registry-dns/pom.xml
+++ b/dubbo-registry-extensions/dubbo-registry-dns/pom.xml
@@ -36,7 +36,6 @@
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
- <version>3.0.12</version>
</dependency>
<dependency>
diff --git
a/dubbo-registry-extensions/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
b/dubbo-registry-extensions/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
index 110d814..3c26786 100644
---
a/dubbo-registry-extensions/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
+++
b/dubbo-registry-extensions/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ReflectionBasedServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.dns.util.DNSClientConst;
diff --git
a/dubbo-registry-extensions/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/ReflectionBasedServiceDiscovery.java
b/dubbo-registry-extensions/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/ReflectionBasedServiceDiscovery.java
deleted file mode 100644
index 4efcbe2..0000000
---
a/dubbo-registry-extensions/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/ReflectionBasedServiceDiscovery.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry.dns;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.config.metadata.MetadataServiceDelegation;
-import org.apache.dubbo.metadata.InstanceMetadataChangedListener;
-import org.apache.dubbo.metadata.MetadataService;
-import org.apache.dubbo.metadata.RevisionResolver;
-import org.apache.dubbo.registry.Constants;
-import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
-import org.apache.dubbo.registry.client.DefaultServiceInstance;
-import org.apache.dubbo.registry.client.ServiceInstance;
-import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
-import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-import org.apache.dubbo.registry.client.metadata.MetadataUtils;
-import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.model.ScopeModelUtil;
-import org.apache.dubbo.rpc.service.Destroyable;
-
-import com.alibaba.fastjson.JSONObject;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class ReflectionBasedServiceDiscovery extends AbstractServiceDiscovery {
-
- private final Logger logger = LoggerFactory.getLogger(getClass());
-
- /**
- * Echo check if consumer is still work
- * echo task may take a lot of time when consumer offline, create a new
ScheduledThreadPool
- */
- private final ScheduledExecutorService echoCheckExecutor =
Executors.newScheduledThreadPool(1, new
NamedThreadFactory("Dubbo-Registry-EchoCheck-Consumer"));
-
- // =================================== Provider side
=================================== //
- /**
- * Local {@link ServiceInstance} Metadata's revision
- */
- private String lastMetadataRevision;
-
- // =================================== Consumer side
=================================== //
-
- /**
- * Local Cache of {@link ServiceInstance} Metadata
- * <p>
- * Key - {@link ServiceInstance} ID ( usually ip + port )
- * Value - Json processed metadata string
- */
- private final ConcurrentHashMap<String, String> metadataMap = new
ConcurrentHashMap<>();
-
- /**
- * Local Cache of {@link ServiceInstance}
- * <p>
- * Key - Service Name
- * Value - List {@link ServiceInstance}
- */
- private final ConcurrentHashMap<String, List<ServiceInstance>>
cachedServiceInstances = new ConcurrentHashMap<>();
-
- private final MetadataServiceDelegation metadataService;
-
- public ConcurrentMap<String, MetadataService> metadataServiceProxies = new
ConcurrentHashMap<>();
-
- /**
- * Local Cache of Service's {@link ServiceInstance} list revision,
- * used to check if {@link ServiceInstance} list has been updated
- * <p>
- * Key - ServiceName
- * Value - a revision calculate from {@link List} of {@link
ServiceInstance}
- */
- private final ConcurrentHashMap<String, String> serviceInstanceRevisionMap
= new ConcurrentHashMap<>();
-
- public ReflectionBasedServiceDiscovery(ApplicationModel applicationModel,
URL registryURL) {
- super(applicationModel, registryURL);
- long echoPollingCycle =
registryURL.getParameter(Constants.ECHO_POLLING_CYCLE_KEY,
Constants.DEFAULT_ECHO_POLLING_CYCLE);
-
- this.metadataService =
applicationModel.getBeanFactory().getOrRegisterBean(MetadataServiceDelegation.class);
-
- // Echo check: test if consumer is offline, remove
MetadataChangeListener,
- // reduce the probability of failure when metadata update
- echoCheckExecutor.scheduleAtFixedRate(() -> {
- Map<String, InstanceMetadataChangedListener> listenerMap =
metadataService.getInstanceMetadataChangedListenerMap();
- Iterator<Map.Entry<String, InstanceMetadataChangedListener>>
iterator = listenerMap.entrySet().iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<String, InstanceMetadataChangedListener> entry =
iterator.next();
- try {
- entry.getValue().echo(CommonConstants.DUBBO);
- } catch (RpcException e) {
- if (logger.isInfoEnabled()) {
- logger.info("Send echo message to consumer error.
Possible cause: consumer is offline.");
- }
- iterator.remove();
- }
- }
- }, echoPollingCycle, echoPollingCycle, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void doDestroy() throws Exception {
- metadataMap.clear();
- serviceInstanceRevisionMap.clear();
- echoCheckExecutor.shutdown();
- }
-
- private void updateInstanceMetadata(ServiceInstance serviceInstance) {
- String metadataString =
JSONObject.toJSONString(serviceInstance.getMetadata());
- String metadataRevision = RevisionResolver.calRevision(metadataString);
-
- // check if metadata updated
- if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Update Service Instance Metadata of DNS
registry. Newer metadata: " + metadataString);
- }
-
- lastMetadataRevision = metadataRevision;
-
- // save newest metadata to local
- metadataService.exportInstanceMetadata(metadataString);
-
- // notify to consumer
- Map<String, InstanceMetadataChangedListener> listenerMap =
metadataService.getInstanceMetadataChangedListenerMap();
- Iterator<Map.Entry<String, InstanceMetadataChangedListener>>
iterator = listenerMap.entrySet().iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<String, InstanceMetadataChangedListener> entry =
iterator.next();
- try {
- entry.getValue().onEvent(metadataString);
- } catch (RpcException e) {
- logger.warn("Notify to consumer error. Possible cause:
consumer is offline.");
- // remove listener if consumer is offline
- iterator.remove();
- }
- }
- }
- }
-
- @Override
- public void doRegister(ServiceInstance serviceInstance) throws
RuntimeException {
- updateInstanceMetadata(serviceInstance);
- }
-
- @Override
- public void doUpdate(ServiceInstance serviceInstance) throws
RuntimeException {
- updateInstanceMetadata(serviceInstance);
- }
-
- @Override
- public void doUnregister(ServiceInstance serviceInstance) throws
RuntimeException {
- doUnregister(serviceInstance);
- // notify empty message to consumer
- metadataService.exportInstanceMetadata("");
-
metadataService.getInstanceMetadataChangedListenerMap().forEach((consumerId,
listener) -> listener.onEvent(""));
- metadataService.getInstanceMetadataChangedListenerMap().clear();
- }
-
- @SuppressWarnings("unchecked")
- public final void fillServiceInstance(DefaultServiceInstance
serviceInstance) {
- String hostId = serviceInstance.getAddress();
- if (metadataMap.containsKey(hostId)) {
- // Use cached metadata.
- // Metadata will be updated by provider callback
-
- String metadataString = metadataMap.get(hostId);
- serviceInstance.setMetadata(JSONObject.parseObject(metadataString,
Map.class));
- } else {
- // refer from MetadataUtils, this proxy is different from the one
used to refer exportedURL
- MetadataService metadataService =
getMetadataServiceProxy(serviceInstance);
-
- String consumerId =
ScopeModelUtil.getApplicationModel(registryURL.getScopeModel()).getApplicationName()
+ NetUtils.getLocalHost();
- String metadata = metadataService.getAndListenInstanceMetadata(
- consumerId, metadataString -> {
- if (logger.isDebugEnabled()) {
- logger.debug("Receive callback: " + metadataString +
serviceInstance);
- }
- if (StringUtils.isEmpty(metadataString)) {
- // provider is shutdown
- metadataMap.remove(hostId);
- } else {
- metadataMap.put(hostId, metadataString);
- }
- });
- metadataMap.put(hostId, metadata);
- serviceInstance.setMetadata(JSONObject.parseObject(metadata,
Map.class));
- }
- }
-
- public final void notifyListener(String serviceName,
ServiceInstancesChangedListener listener, List<ServiceInstance> instances) {
- String serviceInstanceRevision =
RevisionResolver.calRevision(JSONObject.toJSONString(instances));
- boolean changed = !serviceInstanceRevision.equalsIgnoreCase(
- serviceInstanceRevisionMap.put(serviceName,
serviceInstanceRevision));
-
- if (logger.isDebugEnabled()) {
- logger.debug("Service changed event received (possibly because of
DNS polling). " +
- "Service Instance changed: " + changed + " Service Name: " +
serviceName);
- }
-
- if (changed) {
- List<ServiceInstance> oldServiceInstances =
cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
-
- // remove expired invoker
- Set<ServiceInstance> allServiceInstances = new
HashSet<>(oldServiceInstances.size() + instances.size());
- allServiceInstances.addAll(oldServiceInstances);
- allServiceInstances.addAll(instances);
-
- allServiceInstances.removeAll(oldServiceInstances);
-
- allServiceInstances.forEach(removedServiceInstance -> {
- destroyMetadataServiceProxy(removedServiceInstance);
- });
-
- cachedServiceInstances.put(serviceName, instances);
- listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
instances));
- }
- }
-
- @Override
- public Set<String> getServices() {
- return Collections.emptySet();
- }
-
- @Override
- public List<ServiceInstance> getInstances(String serviceName) throws
NullPointerException {
- return Collections.emptyList();
- }
-
- private String computeKey(ServiceInstance serviceInstance) {
- return serviceInstance.getServiceName() + "##" +
serviceInstance.getAddress() + "##" +
-
ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance);
- }
-
- private synchronized MetadataService
getMetadataServiceProxy(ServiceInstance instance) {
- return metadataServiceProxies.computeIfAbsent(computeKey(instance), k
-> MetadataUtils.referProxy(instance).getProxy());
- }
-
- private synchronized void destroyMetadataServiceProxy(ServiceInstance
instance) {
- String key = computeKey(instance);
- if (metadataServiceProxies.containsKey(key)) {
- Object metadataServiceProxy = metadataServiceProxies.remove(key);
- if (metadataServiceProxy instanceof Destroyable) {
- ((Destroyable) metadataServiceProxy).$destroy();
- }
- }
- }
-
- /**
- * UT used only
- */
- @Deprecated
- public final ConcurrentHashMap<String, List<ServiceInstance>>
getCachedServiceInstances() {
- return cachedServiceInstances;
- }
-}
diff --git
a/dubbo-rpc-extensions/dubbo-rpc-hessian/src/main/java/org/apache/dubbo/rpc/protocol/hessian/HessianProtocolFilter.java
b/dubbo-rpc-extensions/dubbo-rpc-hessian/src/main/java/org/apache/dubbo/rpc/protocol/hessian/HessianProtocolFilter.java
index 3102d5d..8ad6a20 100644
---
a/dubbo-rpc-extensions/dubbo-rpc-hessian/src/main/java/org/apache/dubbo/rpc/protocol/hessian/HessianProtocolFilter.java
+++
b/dubbo-rpc-extensions/dubbo-rpc-hessian/src/main/java/org/apache/dubbo/rpc/protocol/hessian/HessianProtocolFilter.java
@@ -27,7 +27,7 @@ import org.apache.dubbo.rpc.RpcException;
import java.util.Map;
-@Activate(group = CommonConstants.PROVIDER, order = Integer.MIN_VALUE + 10000)
+@Activate(group = CommonConstants.PROVIDER, order = Integer.MIN_VALUE, before
= "context")
public class HessianProtocolFilter implements Filter {
private final static InternalThreadLocal<Map<String, String>> attachments
= new InternalThreadLocal<>();