This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f0062cf2168686f46508631123ec825fb485799e Author: Kirk Lund <kl...@apache.org> AuthorDate: Fri Apr 9 17:24:26 2021 -0700 GEODE-9132: Cleanup PartitionedRegionClearMessage * Use descriptive names for variables and methods * Use Objects.requireNonNull instead of Assert.assertTrue * Remove unnecessary uses of final, this, and super * Use static logger * Reformat some lines with weird formatting --- .../partitioned/PRClearCreateIndexDUnitTest.java | 5 +- ...ionedRegionAfterClearNotificationDUnitTest.java | 4 +- ...itionedRegionClearWithAlterRegionDUnitTest.java | 2 +- ...gionClearWithConcurrentOperationsDUnitTest.java | 2 +- .../cache/PartitionedRegionClearMessage.java | 225 ++++++++++----------- .../internal/cache/PartitionRegionClearHATest.java | 2 +- 6 files changed, 118 insertions(+), 122 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java index 1c94c2dec7..423932d6d4 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java @@ -236,10 +236,11 @@ public class PRClearCreateIndexDUnitTest implements Serializable { if (message instanceof PartitionedRegionClearMessage) { PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message; if (clearMessage - .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { + .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { lock_others = true; } - if (clearMessage.getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) { + if (clearMessage + .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) { clear_others = true; } } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java index 237b6a8171..7979cfaa16 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java @@ -326,7 +326,7 @@ public class PartitionedRegionAfterClearNotificationDUnitTest implements Seriali public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { if (message instanceof PartitionedRegionClearMessage) { if (((PartitionedRegionClearMessage) message) - .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { + .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { DistributionMessageObserver.setInstance(null); getBlackboard().signalGate("CLOSE_CACHE"); try { @@ -348,7 +348,7 @@ public class PartitionedRegionAfterClearNotificationDUnitTest implements Seriali public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) { if (message instanceof PartitionedRegionClearMessage) { if (((PartitionedRegionClearMessage) message) - .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { + .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) { DistributionMessageObserver.setInstance(null); getBlackboard().signalGate("CLOSE_CACHE"); try { diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java index fb74eb32a4..564706eba8 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java @@ -759,7 +759,7 @@ public class PartitionedRegionClearWithAlterRegionDUnitTest implements Serializa private void shutdownMember(DistributionMessage message) { if (message instanceof PartitionedRegionClearMessage) { if (((PartitionedRegionClearMessage) message) - .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) { + .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) { DistributionMessageObserver.setInstance(null); InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect(); MembershipManagerHelper diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java index fdb91c7005..77537cbda3 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java @@ -703,7 +703,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements private void shutdownMember(DistributionMessage message) { if (message instanceof PartitionedRegionClearMessage) { if (((PartitionedRegionClearMessage) message) - .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) { + .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) { DistributionMessageObserver.setInstance(null); InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect(); MembershipManagerHelper diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java index 724256b365..36cdcb6b3d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java @@ -12,14 +12,16 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ - package org.apache.geode.internal.cache; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Objects; import java.util.Set; +import org.apache.logging.log4j.Logger; + import org.apache.geode.DataSerializer; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.Operation; @@ -32,7 +34,6 @@ import org.apache.geode.distributed.internal.ReplyMessage; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.ReplySender; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.Assert; import org.apache.geode.internal.CopyOnWriteHashSet; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.cache.partitioned.PartitionMessage; @@ -41,96 +42,92 @@ import org.apache.geode.internal.serialization.DeserializationContext; import org.apache.geode.internal.serialization.SerializationContext; import org.apache.geode.logging.internal.log4j.api.LogService; -/** - * this message is for operations no the partition region level, could be sent by any originating - * member to the other members hosting this partition region - */ public class PartitionedRegionClearMessage extends PartitionMessage { + private static final Logger logger = LogService.getLogger(); public enum OperationType { OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR, } - private Object cbArg; - - private OperationType op; - - private EventID eventID; - + private Object callbackArgument; + private OperationType operationType; + private EventID eventId; private PartitionedRegion partitionedRegion; - private Set<Integer> bucketsCleared; - @Override - public EventID getEventID() { - return eventID; + public PartitionedRegionClearMessage() { + // nothing } - public PartitionedRegionClearMessage() {} + PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients, + PartitionedRegion partitionedRegion, ReplyProcessor21 replyProcessor21, + PartitionedRegionClearMessage.OperationType operationType, RegionEventImpl regionEvent) { + super(recipients, partitionedRegion.getPRId(), replyProcessor21); + this.partitionedRegion = partitionedRegion; + this.operationType = operationType; + callbackArgument = regionEvent.getRawCallbackArgument(); + eventId = regionEvent.getEventId(); + } - PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients, PartitionedRegion region, - ReplyProcessor21 processor, PartitionedRegionClearMessage.OperationType operationType, - final RegionEventImpl event) { - super(recipients, region.getPRId(), processor); - partitionedRegion = region; - op = operationType; - cbArg = event.getRawCallbackArgument(); - eventID = event.getEventId(); + @Override + public EventID getEventID() { + return eventId; } - public OperationType getOp() { - return op; + public OperationType getOperationType() { + return operationType; } public void send() { - Assert.assertTrue(getRecipients() != null, "ClearMessage NULL recipients set"); + Objects.requireNonNull(getRecipients(), "ClearMessage NULL recipients set"); + setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed()); partitionedRegion.getDistributionManager().putOutgoing(this); } @Override - protected Throwable processCheckForPR(PartitionedRegion pr, + protected Throwable processCheckForPR(PartitionedRegion partitionedRegion, DistributionManager distributionManager) { - if (pr != null && !pr.getDistributionAdvisor().isInitialized()) { + if (partitionedRegion != null && !partitionedRegion.getDistributionAdvisor().isInitialized()) { return new ForceReattemptException( String.format("%s : could not find partitioned region with Id %s", distributionManager.getDistributionManagerId(), - pr.getRegionIdentifier())); + partitionedRegion.getRegionIdentifier())); } return null; } @Override - protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, - PartitionedRegion partitionedRegion, - long startTime) throws CacheException { - + protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager, + PartitionedRegion partitionedRegion, long startTime) throws CacheException { if (partitionedRegion == null) { return true; } - if (partitionedRegion.isDestroyed()) { return true; } - if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) { + if (operationType == OperationType.OP_LOCK_FOR_PR_CLEAR) { partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender()); - } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) { + } else if (operationType == OperationType.OP_UNLOCK_FOR_PR_CLEAR) { partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal(); } else { RegionEventImpl event = - new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, this.cbArg, true, - partitionedRegion.getMyId(), - getEventID()); + new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true, + partitionedRegion.getMyId(), getEventID()); bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event); } return true; } @Override - protected void appendFields(StringBuilder buff) { - super.appendFields(buff); - buff.append(" cbArg=").append(this.cbArg).append(" op=").append(this.op); + protected void appendFields(StringBuilder stringBuilder) { + super.appendFields(stringBuilder); + stringBuilder + .append(" cbArg=") + .append(callbackArgument) + .append(" op=") + .append(operationType); } @Override @@ -139,21 +136,32 @@ public class PartitionedRegionClearMessage extends PartitionMessage { } @Override - public void fromData(DataInput in, - DeserializationContext context) throws IOException, ClassNotFoundException { + public void fromData(DataInput in, DeserializationContext context) + throws IOException, ClassNotFoundException { super.fromData(in, context); - this.cbArg = DataSerializer.readObject(in); - op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; - eventID = DataSerializer.readObject(in); + callbackArgument = DataSerializer.readObject(in); + operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; + eventId = DataSerializer.readObject(in); } @Override - public void toData(DataOutput out, - SerializationContext context) throws IOException { + public void toData(DataOutput out, SerializationContext context) throws IOException { super.toData(out, context); - DataSerializer.writeObject(this.cbArg, out); - out.writeByte(op.ordinal()); - DataSerializer.writeObject(eventID, out); + DataSerializer.writeObject(callbackArgument, out); + out.writeByte(operationType.ordinal()); + DataSerializer.writeObject(eventId, out); + } + + @Override + protected void sendReply(InternalDistributedMember recipient, int processorId, + DistributionManager distributionManager, ReplyException replyException, + PartitionedRegion partitionedRegion, long startTime) { + if (partitionedRegion != null && startTime > 0) { + partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime); + } + PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage + .send(recipient, processorId, getReplySender(distributionManager), operationType, + bucketsCleared, replyException); } /** @@ -161,97 +169,85 @@ public class PartitionedRegionClearMessage extends PartitionMessage { * received from the "far side" */ public static class PartitionedRegionClearResponse extends ReplyProcessor21 { + CopyOnWriteHashSet<Integer> bucketsCleared = new CopyOnWriteHashSet<>(); public PartitionedRegionClearResponse(InternalDistributedSystem system, - Set<InternalDistributedMember> initMembers) { - super(system, initMembers); + Set<InternalDistributedMember> recipients) { + super(system, recipients); } @Override - public void process(DistributionMessage msg) { - if (msg instanceof PartitionedRegionClearReplyMessage) { - Set<Integer> buckets = ((PartitionedRegionClearReplyMessage) msg).bucketsCleared; + public void process(DistributionMessage message) { + if (message instanceof PartitionedRegionClearReplyMessage) { + Set<Integer> buckets = ((PartitionedRegionClearReplyMessage) message).bucketsCleared; if (buckets != null) { bucketsCleared.addAll(buckets); } } - super.process(msg, true); - } - } - - @Override - protected void sendReply(InternalDistributedMember member, int processorId, - DistributionManager distributionManager, ReplyException ex, - PartitionedRegion partitionedRegion, long startTime) { - if (partitionedRegion != null) { - if (startTime > 0) { - partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime); - } + process(message, true); } - PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage - .send(member, processorId, getReplySender(distributionManager), op, bucketsCleared, - ex); } public static class PartitionedRegionClearReplyMessage extends ReplyMessage { private Set<Integer> bucketsCleared; - private OperationType op; + private OperationType operationType; @Override public boolean getInlineProcess() { return true; } + public static void send(InternalDistributedMember recipient, int processorId, + ReplySender replySender, OperationType operationType, Set<Integer> bucketsCleared, + ReplyException replyException) { + Objects.requireNonNull(recipient, "partitionedRegionClearReplyMessage NULL reply message"); + + PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage replyMessage = + new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, + operationType, bucketsCleared, replyException); + + replyMessage.setRecipient(recipient); + replySender.putOutgoing(replyMessage); + } + /** * Empty constructor to conform to DataSerializable interface */ - public PartitionedRegionClearReplyMessage() {} + public PartitionedRegionClearReplyMessage() { + // Empty constructor to conform to DataSerializable interface + } - private PartitionedRegionClearReplyMessage(int processorId, OperationType op, - Set<Integer> bucketsCleared, ReplyException ex) { - super(); + private PartitionedRegionClearReplyMessage(int processorId, OperationType operationType, + Set<Integer> bucketsCleared, ReplyException replyException) { this.bucketsCleared = bucketsCleared; - this.op = op; + this.operationType = operationType; setProcessorId(processorId); - setException(ex); - } - - /** Send an ack */ - public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm, - OperationType op, Set<Integer> bucketsCleared, ReplyException ex) { - - Assert.assertTrue(recipient != null, "partitionedRegionClearReplyMessage NULL reply message"); - - PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage m = - new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, op, - bucketsCleared, ex); - - m.setRecipient(recipient); - dm.putOutgoing(m); + setException(replyException); } /** * Processes this message. This method is invoked by the receiver of the message. * - * @param dm the distribution manager that is processing the message. + * @param distributionManager the distribution manager that is processing the message. */ @Override - public void process(final DistributionManager dm, final ReplyProcessor21 rp) { - final long startTime = getTimestamp(); + public void process(DistributionManager distributionManager, + ReplyProcessor21 replyProcessor21) { + long startTime = getTimestamp(); - if (rp == null) { - if (LogService.getLogger().isTraceEnabled(LogMarker.DM_VERBOSE)) { - LogService.getLogger().trace(LogMarker.DM_VERBOSE, "{}: processor not found", this); + if (replyProcessor21 == null) { + if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { + logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this); } return; } - rp.process(this); + replyProcessor21.process(this); - dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime); + distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime); } @Override @@ -260,30 +256,29 @@ public class PartitionedRegionClearMessage extends PartitionMessage { } @Override - public void fromData(DataInput in, - DeserializationContext context) throws IOException, ClassNotFoundException { + public void fromData(DataInput in, DeserializationContext context) + throws IOException, ClassNotFoundException { super.fromData(in, context); - op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; + operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()]; bucketsCleared = DataSerializer.readObject(in); } @Override - public void toData(DataOutput out, - SerializationContext context) throws IOException { + public void toData(DataOutput out, SerializationContext context) throws IOException { super.toData(out, context); - out.writeByte(op.ordinal()); + out.writeByte(operationType.ordinal()); DataSerializer.writeObject(bucketsCleared, out); } @Override public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append("PartitionedRegionClearReplyMessage ") - .append("processorId=").append(this.processorId) + return new StringBuilder() + .append("PartitionedRegionClearReplyMessage ") + .append("processorId=").append(processorId) .append(" sender=").append(sender) - .append(" bucketsCleared ").append(this.bucketsCleared) - .append(" exception=").append(getException()); - return sb.toString(); + .append(" bucketsCleared ").append(bucketsCleared) + .append(" exception=").append(getException()) + .toString(); } } } diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java index 7d0db0a17e..5497028f1e 100644 --- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java +++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java @@ -220,7 +220,7 @@ public class PartitionRegionClearHATest implements Serializable { if (message instanceof PartitionedRegionClearMessage) { PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message; if (clearMessage - .getOp() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) { + .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) { try { // count down to 1 so that we can go ahead and restart the server latch.countDown();