This is an automated email from the ASF dual-hosted git repository.
terrymanu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d6c8c142aee Fix: modify the logic error of EventChangedType in
EtcdRepository.java (#38561)
d6c8c142aee is described below
commit d6c8c142aee23b8556ae24fb67a0f374abcc9a04
Author: KazenkE <[email protected]>
AuthorDate: Thu May 28 11:43:20 2026 +0800
Fix: modify the logic error of EventChangedType in EtcdRepository.java
(#38561)
* Fix a logic error in the getEventChangedType function in
EctdRepository.java and add test functions.
* delete unused help.md made by myself.
---
.../repository/cluster/etcd/EtcdRepository.java | 5 +-
.../cluster/etcd/EtcdRepositoryTest.java | 66 ++++++++++++++--------
2 files changed, 42 insertions(+), 29 deletions(-)
diff --git
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 59394a6bdf1..28d9fe5a7b4 100644
---
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -176,12 +176,9 @@ public final class EtcdRepository implements
ClusterPersistRepository {
}
private Type getEventChangedType(final WatchEvent event) {
- if (1 == event.getKeyValue().getVersion()) {
- return Type.ADDED;
- }
switch (event.getEventType()) {
case PUT:
- return Type.UPDATED;
+ return 1 == event.getKeyValue().getVersion() ? Type.ADDED :
Type.UPDATED;
case DELETE:
return Type.DELETED;
default:
diff --git
a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
index daa1e6f87f1..4c0e77028c0 100644
---
a/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/etcd/src/test/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepositoryTest.java
@@ -34,9 +34,13 @@ import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.stub.StreamObserver;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
+import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.internal.configuration.plugins.Plugins;
@@ -52,10 +56,13 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
@@ -66,6 +73,7 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EtcdRepositoryTest {
private final EtcdRepository repository = new EtcdRepository();
@@ -168,38 +176,33 @@ class EtcdRepositoryTest {
}
@Test
- void assertWatchUpdate() {
- doAnswer(invocationOnMock -> {
- Watch.Listener listener = (Watch.Listener)
invocationOnMock.getArguments()[2];
- listener.onNext(buildWatchResponse(WatchEvent.EventType.PUT));
- return mock(Watch.Watcher.class);
- }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
- repository.watch("key1", event -> {
- });
- verify(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
+ void assertWatchWithPutAndVersionOne() throws ExecutionException,
InterruptedException, TimeoutException {
+ DataChangedEvent actual = assertWatch(WatchEvent.EventType.PUT, 1L);
+ assertThat(actual.getType(), is(Type.ADDED));
}
@Test
- void assertWatchDelete() {
- doAnswer(invocationOnMock -> {
- Watch.Listener listener = (Watch.Listener)
invocationOnMock.getArguments()[2];
- listener.onNext(buildWatchResponse(WatchEvent.EventType.DELETE));
- return mock(Watch.Watcher.class);
- }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
- repository.watch("key1", event -> {
- });
- verify(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
+ void assertWatchWithPutAndVersionMoreThanOne() throws ExecutionException,
InterruptedException, TimeoutException {
+ DataChangedEvent actual = assertWatch(WatchEvent.EventType.PUT, 2L);
+ assertThat(actual.getType(), is(Type.UPDATED));
+ }
+
+ @Test
+ void assertWatchWithDeleteAndVersionOne() throws ExecutionException,
InterruptedException, TimeoutException {
+ DataChangedEvent actual = assertWatch(WatchEvent.EventType.DELETE, 1L);
+ assertThat(actual.getType(), is(Type.DELETED));
}
@Test
void assertWatchIgnored() {
doAnswer(invocationOnMock -> {
Watch.Listener listener = (Watch.Listener)
invocationOnMock.getArguments()[2];
-
listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED));
+
listener.onNext(buildWatchResponse(WatchEvent.EventType.UNRECOGNIZED, 1L));
return mock(Watch.Watcher.class);
}).when(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
- repository.watch("key1", event -> {
- });
+ CompletableFuture<DataChangedEvent> actual = new CompletableFuture<>();
+ repository.watch("key1", actual::complete);
+ assertFalse(actual.isDone());
verify(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
}
@@ -215,8 +218,8 @@ class EtcdRepositoryTest {
verify(kv).put(any(ByteSequence.class), any(ByteSequence.class));
}
- @Test
- void assertClose() {
+ @AfterAll
+ void closeRepositoryAfterAllTests() {
repository.close();
verify(client).close();
}
@@ -269,13 +272,26 @@ class EtcdRepositoryTest {
}
}
+ private DataChangedEvent assertWatch(final WatchEvent.EventType eventType,
final long version) throws ExecutionException, InterruptedException,
TimeoutException {
+ doAnswer(invocationOnMock -> {
+ Watch.Listener listener = (Watch.Listener)
invocationOnMock.getArguments()[2];
+ listener.onNext(buildWatchResponse(eventType, version));
+ return mock(Watch.Watcher.class);
+ }).when(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
+ CompletableFuture<DataChangedEvent> changedEventFuture = new
CompletableFuture<>();
+ repository.watch("key1", changedEventFuture::complete);
+ verify(watch).watch(any(ByteSequence.class), any(WatchOption.class),
any(Watch.Listener.class));
+ return changedEventFuture.get(5L, TimeUnit.SECONDS);
+ }
+
@SneakyThrows({NoSuchFieldException.class, SecurityException.class,
IllegalAccessException.class})
- private WatchResponse buildWatchResponse(final WatchEvent.EventType
eventType) {
+ private WatchResponse buildWatchResponse(final WatchEvent.EventType
eventType, final long version) {
WatchResponse result = new
WatchResponse(mock(io.etcd.jetcd.api.WatchResponse.class), ByteSequence.EMPTY);
List<WatchEvent> events = new LinkedList<>();
io.etcd.jetcd.api.KeyValue keyValue1 =
io.etcd.jetcd.api.KeyValue.newBuilder()
.setKey(ByteString.copyFromUtf8("key1"))
- .setValue(ByteString.copyFromUtf8("value1")).build();
+ .setValue(ByteString.copyFromUtf8("value1"))
+ .setVersion(version).build();
KeyValue keyValue = new KeyValue(keyValue1, ByteSequence.EMPTY);
events.add(new WatchEvent(keyValue, mock(KeyValue.class), eventType));
Plugins.getMemberAccessor().set(WatchResponse.class.getDeclaredField("events"),
result, events);