[ https://issues.apache.org/jira/browse/GEODE-3788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260262#comment-16260262 ]
ASF GitHub Bot commented on GEODE-3788: --------------------------------------- jinmeiliao closed pull request #1081: GEODE-3788: add utility methods to get the async event queues in the … URL: https://github.com/apache/geode/pull/1081 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java index 887b04abed..9f3f5478ee 100755 --- a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java +++ b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java @@ -216,6 +216,11 @@ public abstract DistributedLockServiceMXBean getDistributedLockServiceMXBean( */ public abstract Set<ObjectName> queryMBeanNames(DistributedMember member); + /** + * Returns the ids of the async event queues on this member + */ + public abstract Set<String> getAsyncEventQueueIds(DistributedMember member); + /** * Returns an instance of an MBean. This is a reference to the MBean instance and not a * {@link ObjectInstance}. diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java index 60615db12c..d067fc1a91 100755 --- a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; import javax.management.Notification; import javax.management.ObjectName; @@ -357,6 +358,13 @@ public MemberMXBean getMemberMXBean() { } } + @Override + public Set<String> getAsyncEventQueueIds(DistributedMember member) { + Set<ObjectName> mBeanNames = this.queryMBeanNames(member); + return mBeanNames.stream().filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service"))) + .map(x -> x.getKeyProperty("queue")).collect(Collectors.toSet()); + } + @Override public ObjectName registerMBean(Object object, ObjectName objectName) { verifyManagementService(); diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java index 02eb8b81a6..686bbc5f83 100755 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java @@ -60,6 +60,7 @@ import org.apache.geode.management.ManagementService; import org.apache.geode.management.cli.Result; import org.apache.geode.management.internal.MBeanJMXAdapter; +import org.apache.geode.management.internal.SystemManagementService; import org.apache.geode.management.internal.cli.exceptions.UserErrorException; import org.apache.geode.management.internal.cli.i18n.CliStrings; import org.apache.geode.management.internal.cli.result.ResultBuilder; @@ -406,6 +407,16 @@ public static Result getFunctionResult(ResultCollector<?, ?> rc, String commandN return result; } + public static Set<DistributedMember> getMembersWithAsyncEventQueue(InternalCache cache, + String queueId) { + SystemManagementService managementService = + (SystemManagementService) ManagementService.getExistingManagementService(cache); + Set<DistributedMember> members = findMembers(null, null); + return members.stream() + .filter(m -> managementService.getAsyncEventQueueIds(m).contains(queueId)) + .collect(Collectors.toSet()); + } + static class CustomFileFilter implements FileFilter { private String extensionWithDot; diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java index 134a15300c..6a30378b3d 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java @@ -183,4 +183,8 @@ default ManagementService getManagementService() { default Set<DistributedMember> findAnyMembersForRegion(InternalCache cache, String regionPath) { return CliUtil.getRegionAssociatedMembers(regionPath, cache, false); } + + default Set<DistributedMember> findMembersWithAsyncEventQueue(String queueId) { + return CliUtil.getMembersWithAsyncEventQueue(getCache(), queueId); + } } diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java index f1050d8423..d3b97b80da 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java @@ -31,6 +31,7 @@ import org.apache.geode.distributed.ConfigurationProperties; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.apache.geode.management.internal.cli.exceptions.UserErrorException; import org.apache.geode.test.dunit.rules.LocatorServerStartupRule; import org.apache.geode.test.dunit.rules.MemberVM; @@ -184,6 +185,35 @@ public void getMemberByNameOrId() throws Exception { } + @Test + public void getMembersWithQueueId() throws Exception { + gfsh.executeAndAssertThat("create async-event-queue --id=queue1 --group=group1 --listener=" + + MyAsyncEventListener.class.getName()).statusIsSuccess(); + gfsh.executeAndAssertThat("create async-event-queue --id=queue2 --group=group2 --listener=" + + MyAsyncEventListener.class.getName()).statusIsSuccess(); + gfsh.executeAndAssertThat( + "create async-event-queue --id=queue --listener=" + MyAsyncEventListener.class.getName()) + .statusIsSuccess(); + + locator.waitTillAsyncEventQueuesAreReadyOnServers("queue1", 2); + locator.waitTillAsyncEventQueuesAreReadyOnServers("queue2", 2); + locator.waitTillAsyncEventQueuesAreReadyOnServers("queue", 4); + + locator.invoke(() -> { + members = + CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue1"); + assertThat(getNames(members)).containsExactlyInAnyOrder("member1", "member2"); + + members = + CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue2"); + assertThat(getNames(members)).containsExactlyInAnyOrder("member3", "member4"); + + members = CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue"); + assertThat(getNames(members)).containsExactlyInAnyOrder("member1", "member2", "member3", + "member4"); + }); + } + private static Set<String> getNames(Set<DistributedMember> members) { return members.stream().map(DistributedMember::getName).collect(Collectors.toSet()); } diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java index 0e9cfbc323..4e04b8eb8b 100644 --- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java +++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java @@ -137,4 +137,12 @@ public void waitTillDiskstoreIsReady(String diskstoreName, int serverCount) { vm.invoke(() -> LocatorServerStartupRule.memberStarter.waitTillDiskStoreIsReady(diskstoreName, serverCount)); } + + public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int serverCount) { + vm.invoke(() -> { + LocatorServerStartupRule.memberStarter.waitTillAsyncEventQueuesAreReadyOnServers(queueId, + serverCount); + }); + } + } diff --git a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java index da0f588ce1..e2dcc9cc68 100644 --- a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java +++ b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java @@ -43,6 +43,7 @@ import org.apache.geode.management.DistributedRegionMXBean; import org.apache.geode.management.DistributedSystemMXBean; import org.apache.geode.management.ManagementService; +import org.apache.geode.management.internal.cli.CliUtil; import org.apache.geode.security.SecurityManager; import org.apache.geode.test.junit.rules.serializable.SerializableExternalResource; @@ -205,9 +206,9 @@ protected void normalizeProperties() { if (properties.containsKey(NAME)) { name = properties.getProperty(NAME); } else { - if (this instanceof ServerStarterRule) + if (this instanceof ServerStarterRule) { name = "server"; - else { + } else { name = "locator"; } } @@ -263,6 +264,11 @@ public void waitTillDiskStoreIsReady(String diskstoreName, int serverCount) { .until(() -> getDiskStoreCount(diskstoreName) == serverCount); } + public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int serverCount) { + await().atMost(2, TimeUnit.SECONDS).until( + () -> CliUtil.getMembersWithAsyncEventQueue(getCache(), queueId).size() == serverCount); + } + abstract void stopMember(); @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > alter async event queue attributes > ---------------------------------- > > Key: GEODE-3788 > URL: https://issues.apache.org/jira/browse/GEODE-3788 > Project: Geode > Issue Type: Sub-task > Components: gfsh > Reporter: Swapnil Bawaskar > > We should add a new {{alter async-event-queue}} gfsh command that will allow > users to change the following attributes on the AsyncEventQueue: > - batch size > - batch time interval > - maximum queue memory > Attributes changed with this command should only be reflected in cluster > configuration. We will require users to do a rolling re-start of the servers > for the new settings to take effect. -- This message was sent by Atlassian JIRA (v6.4.14#64029)