This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 430de3bab5a refactor xa recovery id changed event (#18096)
430de3bab5a is described below
commit 430de3bab5a3c5b700d21a9883b237da4ea0ed49
Author: Haoran Meng <[email protected]>
AuthorDate: Tue May 31 16:41:22 2022 +0800
refactor xa recovery id changed event (#18096)
* refactor xa recovery id changed event
* refactor xa recovery id changed event
* refactor xa recovery id changed event
---
.../infra/instance/ComputeNodeInstance.java | 4 ++-
.../infra/instance/InstanceContext.java | 29 +++++++++++++++++-----
.../config/NarayanaConfigurationFileGenerator.java | 3 ++-
.../NarayanaConfigurationFileGeneratorTest.java | 3 ++-
.../cluster/ClusterContextManagerBuilder.java | 2 +-
.../ClusterContextManagerCoordinator.java | 21 +++++++++++++---
.../cluster/coordinator/RegistryCenter.java | 3 +--
...eryIdEvent.java => XaRecoveryIdAddedEvent.java} | 4 +--
...yIdEvent.java => XaRecoveryIdDeletedEvent.java} | 4 +--
.../compute/service/ComputeNodeStatusService.java | 13 +++++++---
.../watcher/ComputeNodeStateChangedWatcher.java | 9 +++++--
.../ClusterContextManagerCoordinatorTest.java | 6 ++---
.../ComputeNodeStateChangedWatcherTest.java | 16 ++++++++----
.../ral/common/queryable/ShowInstanceHandler.java | 3 ++-
14 files changed, 85 insertions(+), 35 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
index 1508e73737d..79b6b2fd604 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstance.java
@@ -25,6 +25,8 @@ import org.apache.shardingsphere.infra.state.StateContext;
import org.apache.shardingsphere.infra.state.StateType;
import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
/**
* Instance of compute node.
@@ -42,7 +44,7 @@ public final class ComputeNodeInstance {
private Long workerId;
- private String xaRecoveryId;
+ private List<String> xaRecoveryIds = new LinkedList<>();
/**
* Set labels.
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 011654d6ede..3e08608b5c0 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -102,19 +102,36 @@ public final class InstanceContext {
}
/**
- * Update instance XA recovery id.
+ * Add instance XA recovery id.
*
* @param instanceId instance id
* @param xaRecoveryId XA recovery id
* @return true if current instance updated, else false
*/
- public boolean updateXaRecoveryId(final String instanceId, final String
xaRecoveryId) {
- if (instanceId.equals(instance.getCurrentInstanceId()) &&
!Objects.equals(xaRecoveryId, instance.getXaRecoveryId())) {
- instance.setXaRecoveryId(xaRecoveryId);
- computeNodeInstances.stream().filter(each ->
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each
-> each.setXaRecoveryId(xaRecoveryId));
+ public boolean addXaRecoveryId(final String instanceId, final String
xaRecoveryId) {
+ if (instanceId.equals(instance.getCurrentInstanceId()) &&
!instance.getXaRecoveryIds().contains(xaRecoveryId)) {
+ instance.getXaRecoveryIds().add(xaRecoveryId);
+ computeNodeInstances.stream().filter(each ->
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each
-> each.getXaRecoveryIds().add(xaRecoveryId));
return true;
}
- computeNodeInstances.stream().filter(each ->
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each
-> each.setXaRecoveryId(xaRecoveryId));
+ computeNodeInstances.stream().filter(each ->
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each
-> each.getXaRecoveryIds().add(xaRecoveryId));
+ return false;
+ }
+
+ /**
+ * Delete instance XA recovery id.
+ *
+ * @param instanceId instance id
+ * @param xaRecoveryId XA recovery id
+ * @return true if current instance updated, else false
+ */
+ public boolean deleteXaRecoveryId(final String instanceId, final String
xaRecoveryId) {
+ if (instanceId.equals(instance.getCurrentInstanceId()) &&
instance.getXaRecoveryIds().contains(xaRecoveryId)) {
+ instance.getXaRecoveryIds().remove(xaRecoveryId);
+ computeNodeInstances.stream().filter(each ->
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each
-> each.getXaRecoveryIds().remove(xaRecoveryId));
+ return true;
+ }
+ computeNodeInstances.stream().filter(each ->
each.getInstanceDefinition().getInstanceId().equals(instanceId)).forEach(each
-> each.getXaRecoveryIds().remove(xaRecoveryId));
return false;
}
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/main/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGenerator.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-x
[...]
index cffc6bf7955..5fde9892c20 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/main/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGenerator.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/main/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGenerator.java
@@ -25,6 +25,7 @@ import
com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAR
import
com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter;
import
com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter;
import com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule;
+import com.google.common.base.Joiner;
import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
@@ -55,7 +56,7 @@ public final class NarayanaConfigurationFileGenerator
implements TransactionConf
@Override
public void generateFile(final Properties transactionProps, final
InstanceContext instanceContext) {
String instanceId =
instanceContext.getInstance().getInstanceDefinition().getInstanceId();
- String recoveryId = null ==
instanceContext.getInstance().getXaRecoveryId() ? instanceId :
instanceContext.getInstance().getXaRecoveryId();
+ String recoveryId =
instanceContext.getInstance().getXaRecoveryIds().isEmpty() ? instanceId :
Joiner.on(",").join(instanceContext.getInstance().getXaRecoveryIds());
NarayanaConfiguration config = createDefaultConfiguration(instanceId,
recoveryId);
if (!transactionProps.isEmpty()) {
appendUserDefinedJdbcStoreConfiguration(transactionProps, config);
diff --git
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/test/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGeneratorTest.java
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transacti
[...]
index 7762c5063b1..d35b16eb519 100644
---
a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/test/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGeneratorTest.java
+++
b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-provider/shardingsphere-transaction-xa-narayana/src/test/java/org/apache/shardingsphere/transaction/xa/narayana/config/NarayanaConfigurationFileGeneratorTest.java
@@ -41,6 +41,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
+import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
@@ -67,7 +68,7 @@ public final class NarayanaConfigurationFileGeneratorTest {
jdbcAccess =
"com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;"
+
"URL=jdbc:mysql://127.0.0.1:3306/jbossts;User=root;Password=12345678";
when(instanceContext.getInstance().getInstanceDefinition().getInstanceId()).thenReturn("127.0.0.1@3307");
-
when(instanceContext.getInstance().getXaRecoveryId()).thenReturn("127.0.0.1@3307");
+
when(instanceContext.getInstance().getXaRecoveryIds()).thenReturn(Arrays.asList("127.0.0.1@3307"));
}
private Properties createProperties() {
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index e33a314f4f3..a2ba6b9a5f8 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -167,7 +167,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private void registerOnline(final MetaDataPersistService
metaDataPersistService, final ContextManagerBuilderParameter parameter, final
ContextManager contextManager,
final RegistryCenter registryCenter) {
String instanceId =
contextManager.getInstanceContext().getInstance().getCurrentInstanceId();
-
contextManager.getInstanceContext().getInstance().setXaRecoveryId(instanceId);
+
contextManager.getInstanceContext().getInstance().getXaRecoveryIds().add(instanceId);
contextManager.getInstanceContext().getInstance().setLabels(parameter.getLabels());
contextManager.getInstanceContext().getComputeNodeInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
new ClusterContextManagerCoordinator(metaDataPersistService,
contextManager, registryCenter);
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 3e66d535b0d..d61f2e13323 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -49,7 +49,8 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdDeletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.DisabledStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -245,11 +246,23 @@ public final class ClusterContextManagerCoordinator {
/**
* Renew instance xa recovery id event.
*
- * @param event xa recovery id event
+ * @param event xa recovery id added event
*/
@Subscribe
- public synchronized void renew(final XaRecoveryIdEvent event) {
- if
(contextManager.getInstanceContext().updateXaRecoveryId(event.getInstanceId(),
event.getXaRecoveryId())) {
+ public synchronized void renew(final XaRecoveryIdAddedEvent event) {
+ if
(contextManager.getInstanceContext().addXaRecoveryId(event.getInstanceId(),
event.getXaRecoveryId())) {
+ contextManager.renewAllTransactionContext();
+ }
+ }
+
+ /**
+ * Renew instance xa recovery id event.
+ *
+ * @param event xa recovery id deleted event
+ */
+ @Subscribe
+ public synchronized void renew(final XaRecoveryIdDeletedEvent event) {
+ if
(contextManager.getInstanceContext().deleteXaRecoveryId(event.getInstanceId(),
event.getXaRecoveryId())) {
contextManager.renewAllTransactionContext();
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index e55832efc88..3d4b07c61d4 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -30,7 +30,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.service.StorageNodeStatusService;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber.StorageNodeStatusSubscriber;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import java.util.Arrays;
/**
* Registry center.
@@ -76,7 +75,7 @@ public final class RegistryCenter {
public void onlineInstance(final ComputeNodeInstance computeNodeInstance) {
computeNodeStatusService.registerOnline(computeNodeInstance.getInstanceDefinition());
computeNodeStatusService.persistInstanceLabels(computeNodeInstance.getCurrentInstanceId(),
computeNodeInstance.getLabels());
-
computeNodeStatusService.persistInstanceXaRecoveryId(computeNodeInstance.getCurrentInstanceId(),
Arrays.asList(computeNodeInstance.getXaRecoveryId()));
+
computeNodeStatusService.persistInstanceXaRecoveryId(computeNodeInstance.getCurrentInstanceId(),
computeNodeInstance.getXaRecoveryIds());
listenerFactory.watchListeners();
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdAddedEvent.java
similarity index 92%
copy from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
copy to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdAddedEvent.java
index 2cad86b500c..fe64ec5d9ff 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdAddedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Xa recovery id event.
+ * Xa recovery id added event.
*/
@RequiredArgsConstructor
@Getter
-public final class XaRecoveryIdEvent implements GovernanceEvent {
+public final class XaRecoveryIdAddedEvent implements GovernanceEvent {
private final String instanceId;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdDeletedEvent.java
similarity index 91%
rename from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
rename to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdDeletedEvent.java
index 2cad86b500c..e52ecb4e4d4 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdEvent.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/XaRecoveryIdDeletedEvent.java
@@ -22,11 +22,11 @@ import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
/**
- * Xa recovery id event.
+ * Xa recovery id deleted event.
*/
@RequiredArgsConstructor
@Getter
-public final class XaRecoveryIdEvent implements GovernanceEvent {
+public final class XaRecoveryIdDeletedEvent implements GovernanceEvent {
private final String instanceId;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index 103ab18b392..22d76c70bf7 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -82,9 +82,14 @@ public final class ComputeNodeStatusService {
* @param xaRecoveryIds collection of xa recovery id
*/
public void persistInstanceXaRecoveryId(final String instanceId, final
Collection<String> xaRecoveryIds) {
- loadXaRecoveryIds(instanceId).forEach(each ->
repository.delete(ComputeNode.getInstanceXaRecoveryIdNodePath(each,
instanceId)));
- for (String each : xaRecoveryIds) {
-
repository.persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(each,
instanceId), "");
+ Collection<String> originalXaRecoveryIds =
loadXaRecoveryIds(instanceId);
+ if (originalXaRecoveryIds.isEmpty()) {
+ xaRecoveryIds.forEach(each ->
repository.persistEphemeral(ComputeNode.getInstanceXaRecoveryIdNodePath(each,
instanceId), ""));
+ } else {
+ originalXaRecoveryIds.stream().filter(each ->
!xaRecoveryIds.contains(each)).forEach(each -> repository.delete(ComputeNode
+ .getInstanceXaRecoveryIdNodePath(each, instanceId)));
+ xaRecoveryIds.stream().filter(each ->
!originalXaRecoveryIds.contains(each)).forEach(each ->
repository.persistEphemeral(ComputeNode
+ .getInstanceXaRecoveryIdNodePath(each, instanceId), ""));
}
}
@@ -173,8 +178,8 @@ public final class ComputeNodeStatusService {
ComputeNodeInstance result = new
ComputeNodeInstance(instanceDefinition);
result.setLabels(loadInstanceLabels(instanceDefinition.getInstanceId()));
result.switchState(loadInstanceStatus(instanceDefinition.getInstanceId()));
+
result.getXaRecoveryIds().addAll(loadXaRecoveryIds(instanceDefinition.getInstanceId()));
loadInstanceWorkerId(instanceDefinition.getInstanceId()).ifPresent(result::setWorkerId);
- // TODO load xa recovery id list
return result;
}
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChan
[...]
index e7419e769c2..b8c492dcc1d 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -29,7 +29,8 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdDeletedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -111,7 +112,11 @@ public final class ComputeNodeStateChangedWatcher
implements GovernanceWatcher<G
private Optional<GovernanceEvent> createXaRecoveryIdEvent(final
DataChangedEvent event) {
Matcher matcher =
Pattern.compile(ComputeNode.getXaRecoveryIdNodePath() + "/([\\S]+)/([\\S]+)$",
Pattern.CASE_INSENSITIVE).matcher(event.getKey());
if (matcher.find()) {
- return Optional.of(new XaRecoveryIdEvent(matcher.group(2),
matcher.group(1)));
+ if (Type.ADDED == event.getType()) {
+ return Optional.of(new
XaRecoveryIdAddedEvent(matcher.group(2), matcher.group(1)));
+ } else if (Type.DELETED == event.getType()) {
+ return Optional.of(new
XaRecoveryIdDeletedEvent(matcher.group(2), matcher.group(1)));
+ }
}
return Optional.empty();
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index bc74826d63b..755b5cf0039 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -67,7 +67,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.DisabledStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
@@ -309,8 +309,8 @@ public final class ClusterContextManagerCoordinatorTest {
@Test
public void assertRenewXaRecoveryIdEvent() {
- coordinator.renew(new
XaRecoveryIdEvent(contextManager.getInstanceContext().getInstance().getInstanceDefinition().getInstanceId(),
"127.0.0.100@3306"));
-
assertThat(contextManager.getInstanceContext().getInstance().getXaRecoveryId(),
is("127.0.0.100@3306"));
+ coordinator.renew(new
XaRecoveryIdAddedEvent(contextManager.getInstanceContext().getInstance().getInstanceDefinition().getInstanceId(),
"127.0.0.100@3306"));
+
assertTrue(contextManager.getInstanceContext().getInstance().getXaRecoveryIds().contains("127.0.0.100@3306"));
}
@Test
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeState
[...]
index fb5cfc86f31..914fb10f91d 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcherTest.java
@@ -17,13 +17,14 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher;
+import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.ComputeNodeStatus;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdAddedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.XaRecoveryIdDeletedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.Test;
@@ -98,7 +99,12 @@ public final class ComputeNodeStateChangedWatcherTest {
Optional<GovernanceEvent> actual = new ComputeNodeStateChangedWatcher()
.createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307",
"", Type.ADDED));
assertTrue(actual.isPresent());
- assertThat(((XaRecoveryIdEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
- assertThat(((XaRecoveryIdEvent) actual.get()).getXaRecoveryId(),
is("127.0.0.1@3307"));
+ assertThat(((XaRecoveryIdAddedEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
+ assertThat(((XaRecoveryIdAddedEvent) actual.get()).getXaRecoveryId(),
is("127.0.0.1@3307"));
+ actual = new ComputeNodeStateChangedWatcher()
+ .createGovernanceEvent(new
DataChangedEvent("/nodes/compute_nodes/xa_recovery_id/127.0.0.1@3307/127.0.0.1@3307",
"", Type.DELETED));
+ assertTrue(actual.isPresent());
+ assertThat(((XaRecoveryIdDeletedEvent) actual.get()).getInstanceId(),
is("127.0.0.1@3307"));
+ assertThat(((XaRecoveryIdDeletedEvent)
actual.get()).getXaRecoveryId(), is("127.0.0.1@3307"));
}
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
index 912ed527f4f..b9223a36282 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/ral/common/queryable/ShowInstanceHandler.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.proxy.backend.text.distsql.ral.common.queryable;
+import com.google.common.base.Joiner;
import
org.apache.shardingsphere.distsql.parser.statement.ral.common.queryable.ShowInstanceStatement;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.definition.InstanceDefinition;
@@ -75,7 +76,7 @@ public final class ShowInstanceHandler extends
QueryableRALBackendHandler<ShowIn
}
private List<Object> buildRow(final ComputeNodeInstance instance, final
String modeType) {
- return buildRow(instance.getInstanceDefinition(),
instance.getState().getCurrentState().name(), modeType, instance.getLabels(),
instance.getXaRecoveryId());
+ return buildRow(instance.getInstanceDefinition(),
instance.getState().getCurrentState().name(), modeType, instance.getLabels(),
Joiner.on(",").join(instance.getXaRecoveryIds()));
}
private List<Object> buildRow(final InstanceDefinition instanceDefinition,
final String status, final String modeType, final Collection<String>
instanceLabels, final String xaRecoveryId) {