This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 430de3bab5a refactor xa recovery id changed event (#18096)
430de3bab5a is described below

commit 430de3bab5a3c5b700d21a9883b237da4ea0ed49
Author: Haoran Meng <[email protected]>
AuthorDate: Tue May 31 16:41:22 2022 +0800

    refactor xa recovery id changed event (#18096)
    
    * refactor xa recovery id changed event
    
    * refactor xa recovery id changed event
    
    * refactor xa recovery id changed event
---
 .../infra/instance/ComputeNodeInstance.java        |  4 ++-
 .../infra/instance/InstanceContext.java            | 29 +++++++++++++++++-----
 .../config/NarayanaConfigurationFileGenerator.java |  3 ++-
 .../NarayanaConfigurationFileGeneratorTest.java    |  3 ++-
 .../cluster/ClusterContextManagerBuilder.java      |  2 +-
 .../ClusterContextManagerCoordinator.java          | 21 +++++++++++++---
 .../cluster/coordinator/RegistryCenter.java        |  3 +--
 ...eryIdEvent.java => XaRecoveryIdAddedEvent.java} |  4 +--
 ...yIdEvent.java => XaRecoveryIdDeletedEvent.java} |  4 +--
 .../compute/service/ComputeNodeStatusService.java  | 13 +++++++---
 .../watcher/ComputeNodeStateChangedWatcher.java    |  9 +++++--
 .../ClusterContextManagerCoordinatorTest.java      |  6 ++---
 .../ComputeNodeStateChangedWatcherTest.java        | 16 ++++++++----
 .../ral/common/queryable/ShowInstanceHandler.java  |  3 ++-
 14 files changed, 85 insertions(+), 35 deletions(-)

diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 1508e73737d..79b6b2fd604 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -25,6 +25,8 @@ import org.apache.shardingsphere.infra.state.StateContext;
 import org.apache.shardingsphere.infra.state.StateType;
 
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Instance of compute node.
@@ -42,7 +44,7 @@ public final class ComputeNodeInstance {
     
     private Long workerId;
     
-    private String xaRecoveryId;
+    private List<String> xaRecoveryIds = new LinkedList<>();
     
     /**
      * Set labels.
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 011654d6ede..3e08608b5c0 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -102,19 +102,36 @@ public final class InstanceContext {
     }
     
     /**
-     * Update instance XA recovery id.
+     * Add instance XA recovery id.
      *
      * @param instanceId instance id
      * @param xaRecoveryId XA recovery id
      * @return true if current instance updated, else false                    
      */
-    public boolean updateXaRecoveryId(final String instanceId, final String 
xaRecoveryId) {
-        if (instanceId.equals(instance.getCurrentInstanceId()) && 
!Objects.equals(xaRecoveryId, instance.getXaRecoveryId())) {
-            instance.setXaRecoveryId(xaRecoveryId);
-            computeNodeInstances.stream().filter(each -> 
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each 
-> each.setXaRecoveryId(xaRecoveryId));
+    public boolean addXaRecoveryId(final String instanceId, final String 
xaRecoveryId) {
+        if (instanceId.equals(instance.getCurrentInstanceId()) && 
!instance.getXaRecoveryIds().contains(xaRecoveryId)) {
+            instance.getXaRecoveryIds().add(xaRecoveryId);
+            computeNodeInstances.stream().filter(each -> 
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each 
-> each.getXaRecoveryIds().add(xaRecoveryId));
             return true;
         }
-        computeNodeInstances.stream().filter(each -> 
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each 
-> each.setXaRecoveryId(xaRecoveryId));
+        computeNodeInstances.stream().filter(each -> 
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each 
-> each.getXaRecoveryIds().add(xaRecoveryId));
+        return false;
+    }
+    
+    /**
+     * Delete instance XA recovery id.
+     *
+     * @param instanceId instance id
+     * @param xaRecoveryId XA recovery id
+     * @return true if current instance updated, else false                    
+     */
+    public boolean deleteXaRecoveryId(final String instanceId, final String 
xaRecoveryId) {
+        if (instanceId.equals(instance.getCurrentInstanceId()) && 
instance.getXaRecoveryIds().contains(xaRecoveryId)) {
+            instance.getXaRecoveryIds().remove(xaRecoveryId);
+            computeNodeInstances.stream().filter(each -> 
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each 
-> each.getXaRecoveryIds().remove(xaRecoveryId));
+            return true;
+        }
+        computeNodeInstances.stream().filter(each -> 
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each 
-> each.getXaRecoveryIds().remove(xaRecoveryId));
         return false;
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/main/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGenerator.java
 
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-x
 [...]
index cffc6bf7955..5fde9892c20 100644
--- 
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/main/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGenerator.java
+++ 
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/main/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGenerator.java
@@ -25,6 +25,7 @@ import 
com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAR
 import 
com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter;
 import 
com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter;
 import com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule;
+import com.google.common.base.Joiner;
 import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
@@ -55,7 +56,7 @@ public final class NarayanaConfigurationFileGenerator 
implements TransactionConf
     @Override
     public void generateFile(final Properties transactionProps, final 
InstanceContext instanceContext) {
         String instanceId = 
instanceContext.getInstance().getInstanceDefinition().getInstanceId();
-        String recoveryId = null == 
instanceContext.getInstance().getXaRecoveryId() ? instanceId : 
instanceContext.getInstance().getXaRecoveryId();
+        String recoveryId = 
instanceContext.getInstance().getXaRecoveryIds().isEmpty() ? instanceId : 
Joiner.on(",").join(instanceContext.getInstance().getXaRecoveryIds());
         NarayanaConfiguration config = createDefaultConfiguration(instanceId, 
recoveryId);
         if (!transactionProps.isEmpty()) {
             appendUserDefinedJdbcStoreConfiguration(transactionProps, config);
diff --git 
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/test/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGeneratorTest.java
 
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transacti
 [...]
index 7762c5063b1..d35b16eb519 100644
--- 
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/test/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGeneratorTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/test/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGeneratorTest.java
@@ -41,6 +41,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -67,7 +68,7 @@ public final class NarayanaConfigurationFileGeneratorTest {
         jdbcAccess = 
"com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;"
                 + 
"URL=jdbc:mysql://127.0.0.1:3306/jbossts;User=root;Password=12345678";
         
when(instanceContext.getInstance().getInstanceDefinition().getInstanceId()).thenReturn("127.0.0.1@3307");
-        
when(instanceContext.getInstance().getXaRecoveryId()).thenReturn("127.0.0.1@3307");
+        
when(instanceContext.getInstance().getXaRecoveryIds()).thenReturn(Arrays.asList("127.0.0.1@3307"));
     }
     
     private Properties createProperties() {
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index e33a314f4f3..a2ba6b9a5f8 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -167,7 +167,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     private void registerOnline(final MetaDataPersistService 
metaDataPersistService, final ContextManagerBuilderParameter parameter, final 
ContextManager contextManager,
                                 final RegistryCenter registryCenter) {
         String instanceId = 
contextManager.getInstanceContext().getInstance().getCurrentInstanceId();
-        
contextManager.getInstanceContext().getInstance().setXaRecoveryId(instanceId);
+        
contextManager.getInstanceContext().getInstance().getXaRecoveryIds().add(instanceId);
         
contextManager.getInstanceContext().getInstance().setLabels(parameter.getLabels());
         
contextManager.getInstanceContext().getComputeNodeInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
         new ClusterContextManagerCoordinator(metaDataPersistService, 
contextManager, registryCenter);
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 3e66d535b0d..d61f2e13323 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -49,7 +49,8 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.DisabledStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -245,11 +246,23 @@ public final class ClusterContextManagerCoordinator {
     /**
      * Renew instance xa recovery id event.
      *
-     * @param event xa recovery id event
+     * @param event xa recovery id added event
      */
     @Subscribe
-    public synchronized void renew(final XaRecoveryIdEvent event) {
-        if 
(contextManager.getInstanceContext().updateXaRecoveryId(event.getInstanceId(), 
event.getXaRecoveryId())) {
+    public synchronized void renew(final XaRecoveryIdAddedEvent event) {
+        if 
(contextManager.getInstanceContext().addXaRecoveryId(event.getInstanceId(), 
event.getXaRecoveryId())) {
+            contextManager.renewAllTransactionContext();
+        }
+    }
+    
+    /**
+     * Renew instance xa recovery id event.
+     *
+     * @param event xa recovery id deleted event
+     */
+    @Subscribe
+    public synchronized void renew(final XaRecoveryIdDeletedEvent event) {
+        if 
(contextManager.getInstanceContext().deleteXaRecoveryId(event.getInstanceId(), 
event.getXaRecoveryId())) {
             contextManager.renewAllTransactionContext();
         }
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index e55832efc88..3d4b07c61d4 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -30,7 +30,6 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import java.util.Arrays;
 
 /**
  * Registry center.
@@ -76,7 +75,7 @@ public final class RegistryCenter {
     public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
         
computeNodeStatusService.registerOnline(computeNodeInstance.getInstanceDefinition());
         
computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(),
 computeNodeInstance.getLabels());
-        
computeNodeStatusService.persistInstanceXaRecoveryId(computeNodeInstance.getCurrentInstanceId(),
 Arrays.asList(computeNodeInstance.getXaRecoveryId()));
+        
computeNodeStatusService.persistInstanceXaRecoveryId(computeNodeInstance.getCurrentInstanceId(),
 computeNodeInstance.getXaRecoveryIds());
         listenerFactory.watchListeners();
     }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdAddedEvent.java
similarity index 92%
copy from 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
copy to 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdAddedEvent.java
index 2cad86b500c..fe64ec5d9ff 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdAddedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Xa recovery id event.
+ * Xa recovery id added event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class XaRecoveryIdEvent implements GovernanceEvent {
+public final class XaRecoveryIdAddedEvent implements GovernanceEvent {
     
     private final String instanceId;
     
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdDeletedEvent.java
similarity index 91%
rename from 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
rename to 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdDeletedEvent.java
index 2cad86b500c..e52ecb4e4d4 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdDeletedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
 
 /**
- * Xa recovery id event.
+ * Xa recovery id deleted event.
  */
 @RequiredArgsConstructor
 @Getter
-public final class XaRecoveryIdEvent implements GovernanceEvent {
+public final class XaRecoveryIdDeletedEvent implements GovernanceEvent {
     
     private final String instanceId;
     
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 103ab18b392..22d76c70bf7 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -82,9 +82,14 @@ public final class ComputeNodeStatusService {
      * @param xaRecoveryIds collection of xa recovery id
      */
     public void persistInstanceXaRecoveryId(final String instanceId, final 
Collection<String> xaRecoveryIds) {
-        loadXaRecoveryIds(instanceId).forEach(each -> 
repository.delete(ComputeNode.getInstanceXaRecoveryIdNodePath(each, 
instanceId)));
-        for (String each : xaRecoveryIds) {
-            
repository.persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(each, 
instanceId), "");
+        Collection<String> originalXaRecoveryIds = 
loadXaRecoveryIds(instanceId);
+        if (originalXaRecoveryIds.isEmpty()) {
+            xaRecoveryIds.forEach(each -> 
repository.persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(each, 
instanceId), ""));
+        } else {
+            originalXaRecoveryIds.stream().filter(each -> 
!xaRecoveryIds.contains(each)).forEach(each -> repository.delete(ComputeNode
+                    .getInstanceXaRecoveryIdNodePath(each, instanceId)));
+            xaRecoveryIds.stream().filter(each -> 
!originalXaRecoveryIds.contains(each)).forEach(each -> 
repository.persistEphemeral(ComputeNode
+                    .getInstanceXaRecoveryIdNodePath(each, instanceId), ""));
         }
     }
     
@@ -173,8 +178,8 @@ public final class ComputeNodeStatusService {
         ComputeNodeInstance result = new 
ComputeNodeInstance(instanceDefinition);
         
result.setLabels(loadInstanceLabels(instanceDefinition.getInstanceId()));
         
result.switchState(loadInstanceStatus(instanceDefinition.getInstanceId()));
+        
result.getXaRecoveryIds().addAll(loadXaRecoveryIds(instanceDefinition.getInstanceId()));
         
loadInstanceWorkerId(instanceDefinition.getInstanceId()).ifPresent(result::setWorkerId);
-        // TODO load xa recovery id list
         return result;
     }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
 [...]
index e7419e769c2..b8c492dcc1d 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -29,7 +29,8 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdDeletedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -111,7 +112,11 @@ public final class ComputeNodeStateChangedWatcher 
implements GovernanceWatcher<G
     private Optional<GovernanceEvent> createXaRecoveryIdEvent(final 
DataChangedEvent event) {
         Matcher matcher = 
Pattern.compile(ComputeNode.getXaRecoveryIdNodePath() + "/([\\S]+)/([\\S]+)$", 
Pattern.CASE_INSENSITIVE).matcher(event.getKey());
         if (matcher.find()) {
-            return Optional.of(new XaRecoveryIdEvent(matcher.group(2), 
matcher.group(1)));
+            if (Type.ADDED == event.getType()) {
+                return Optional.of(new 
XaRecoveryIdAddedEvent(matcher.group(2), matcher.group(1)));
+            } else if (Type.DELETED == event.getType()) {
+                return Optional.of(new 
XaRecoveryIdDeletedEvent(matcher.group(2), matcher.group(1)));
+            }
         }
         return Optional.empty();
     }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index bc74826d63b..755b5cf0039 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -67,7 +67,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.DisabledStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
@@ -309,8 +309,8 @@ public final class ClusterContextManagerCoordinatorTest {
     
     @Test
     public void assertRenewXaRecoveryIdEvent() {
-        coordinator.renew(new 
XaRecoveryIdEvent(contextManager.getInstanceContext().getInstance().getInstanceDefinition().getInstanceId(),
 "127.0.0.100@3306"));
-        
assertThat(contextManager.getInstanceContext().getInstance().getXaRecoveryId(), 
is("127.0.0.100@3306"));
+        coordinator.renew(new 
XaRecoveryIdAddedEvent(contextManager.getInstanceContext().getInstance().getInstanceDefinition().getInstanceId(),
 "127.0.0.100@3306"));
+        
assertTrue(contextManager.getInstanceContext().getInstance().getXaRecoveryIds().contains("127.0.0.100@3306"));
     }
     
     @Test
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
 [...]
index fb5cfc86f31..914fb10f91d 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -17,13 +17,14 @@
 
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
 
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdDeletedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 import org.junit.Test;
@@ -98,7 +99,12 @@ public final class ComputeNodeStateChangedWatcherTest {
         Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
                 .createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307",
 "", Type.ADDED));
         assertTrue(actual.isPresent());
-        assertThat(((XaRecoveryIdEvent) actual.get()).getInstanceId(), 
is("127.0.0.1@3307"));
-        assertThat(((XaRecoveryIdEvent) actual.get()).getXaRecoveryId(), 
is("127.0.0.1@3307"));
+        assertThat(((XaRecoveryIdAddedEvent) actual.get()).getInstanceId(), 
is("127.0.0.1@3307"));
+        assertThat(((XaRecoveryIdAddedEvent) actual.get()).getXaRecoveryId(), 
is("127.0.0.1@3307"));
+        actual = new ComputeNodeStateChangedWatcher()
+                .createGovernanceEvent(new 
DataChangedEvent("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307",
 "", Type.DELETED));
+        assertTrue(actual.isPresent());
+        assertThat(((XaRecoveryIdDeletedEvent) actual.get()).getInstanceId(), 
is("127.0.0.1@3307"));
+        assertThat(((XaRecoveryIdDeletedEvent) 
actual.get()).getXaRecoveryId(), is("127.0.0.1@3307"));
     }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
index 912ed527f4f..b9223a36282 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.queryable;
 
+import com.google.common.base.Joiner;
 import 
org.apache.shardingsphere.distsql.parser.statement.ral.common.queryable.ShowInstanceStatement;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
@@ -75,7 +76,7 @@ public final class ShowInstanceHandler extends 
QueryableRALBackendHandler<ShowIn
     }
     
     private List<Object> buildRow(final ComputeNodeInstance instance, final 
String modeType) {
-        return buildRow(instance.getInstanceDefinition(), 
instance.getState().getCurrentState().name(), modeType, instance.getLabels(), 
instance.getXaRecoveryId());
+        return buildRow(instance.getInstanceDefinition(), 
instance.getState().getCurrentState().name(), modeType, instance.getLabels(), 
Joiner.on(",").join(instance.getXaRecoveryIds()));
     }
     
     private List<Object> buildRow(final InstanceDefinition instanceDefinition, 
final String status, final String modeType, final Collection<String> 
instanceLabels, final String xaRecoveryId) {

Reply via email to