This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit cc1841a59dbb34c7a07ab9c685f7c8f05e169e5e Author: Jacob Barrett <jbarr...@pivotal.io> AuthorDate: Fri Jan 22 07:39:38 2021 -0800 GEODE-6588: Static analyzer cleanup --- .../geode/internal/cache/PartitionedRegion.java | 2 +- .../internal/cache/tier/sockets/BaseCommand.java | 145 +++++++-------------- .../internal/cache/tier/sockets/command/Get70.java | 109 +++++----------- .../cache/tier/sockets/command/PutAll80.java | 77 ++++------- .../cache/tier/sockets/command/RemoveAll.java | 54 +++----- 5 files changed, 127 insertions(+), 260 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 4cb63d3..cd89363 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -4534,7 +4534,7 @@ public class PartitionedRegion extends LocalRegion /** * Fetches entries from local and remote nodes and appends these to register-interest response. */ - public void fetchEntries(HashMap<Integer, HashSet> bucketKeys, VersionedObjectList values, + public void fetchEntries(HashMap<Integer, HashSet<Object>> bucketKeys, VersionedObjectList values, ServerConnection servConn) throws IOException { int retryAttempts = calcRetry(); RetryTimeKeeper retryTime = null; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java index d418616..8000d1e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java @@ -14,6 +14,8 @@ */ package org.apache.geode.internal.cache.tier.sockets; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; + import java.io.EOFException; import java.io.IOException; import java.io.InterruptedIOException; @@ -161,7 +163,7 @@ public abstract class BaseCommand implements Command { SecurityService securityService) { // Read the request and update the statistics long start = DistributionStats.getStatTime(); - if (EntryLogger.isEnabled() && serverConnection != null) { + if (EntryLogger.isEnabled()) { EntryLogger.setSource(serverConnection.getMembershipID(), "c2s"); } boolean shouldMasquerade = shouldMasqueradeForTx(clientMessage); @@ -227,7 +229,7 @@ public abstract class BaseCommand implements Command { */ public boolean recoverVersionTagForRetriedOperation(EntryEventImpl clientEvent) { InternalRegion r = clientEvent.getRegion(); - VersionTag tag = r.findVersionTagForEvent(clientEvent.getEventId()); + VersionTag<?> tag = r.findVersionTagForEvent(clientEvent.getEventId()); if (tag == null) { if (r instanceof DistributedRegion || r instanceof PartitionedRegion) { // TODO this could be optimized for partitioned regions by sending the key @@ -254,8 +256,8 @@ public abstract class BaseCommand implements Command { * The client event should have the event identifier from the client and the region affected by * the operation. */ - protected VersionTag findVersionTagsForRetriedBulkOp(LocalRegion region, EventID eventID) { - VersionTag tag = region.findVersionTagForClientBulkOp(eventID); + protected VersionTag<?> findVersionTagsForRetriedBulkOp(LocalRegion region, EventID eventID) { + VersionTag<?> tag = region.findVersionTagForClientBulkOp(eventID); if (tag != null) { if (logger.isDebugEnabled()) { logger.debug("recovered version tag {} for replayed bulk operation {}", tag, eventID); @@ -292,22 +294,6 @@ public abstract class BaseCommand implements Command { } } - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, - PartitionedRegion pr, byte nwHop) throws IOException { - Message replyMsg = serverConnection.getReplyMessage(); - serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); - replyMsg.setMessageType(MessageType.REPLY); - replyMsg.setNumberOfParts(1); - replyMsg.setTransactionId(origMsg.getTransactionId()); - replyMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop}); - replyMsg.send(serverConnection); - pr.getPrStats().incPRMetaDataSentCount(); - if (logger.isTraceEnabled()) { - logger.trace("{}: rpl with REFRESH_METADATA tx: {}", serverConnection.getName(), - origMsg.getTransactionId()); - } - } - private static void handleEOFException(Message msg, ServerConnection serverConnection, Exception eof) { CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); @@ -360,8 +346,8 @@ public abstract class BaseCommand implements Command { int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; logger.warn(String.format( "%s: Unexpected IOException during operation for region: %s key: %s messId: %s", - new Object[] {serverConnection.getName(), serverConnection.getModRegion(), - serverConnection.getModKey(), transId}), + serverConnection.getName(), serverConnection.getModRegion(), + serverConnection.getModKey(), transId), e); } else { logger.warn(String.format("%s: Unexpected IOException: ", @@ -383,8 +369,8 @@ public abstract class BaseCommand implements Command { int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; logger.warn(String.format( "%s: Unexpected ShutdownException during operation on region: %s key: %s messageId: %s", - new Object[] {serverConnection.getName(), serverConnection.getModRegion(), - serverConnection.getModKey(), transId}), + serverConnection.getName(), serverConnection.getModRegion(), + serverConnection.getModKey(), transId), e); } else { logger.warn(String.format("%s: Unexpected ShutdownException: ", @@ -421,8 +407,8 @@ public abstract class BaseCommand implements Command { if (!wroteExceptionResponse) { logger.warn(String.format( "%s: Unexpected Exception during operation on region: %s key: %s messageId: %s", - new Object[] {serverConnection.getName(), serverConnection.getModRegion(), - serverConnection.getModKey(), transId}), + serverConnection.getName(), serverConnection.getModRegion(), + serverConnection.getModKey(), transId), e); } else { if (logger.isDebugEnabled()) { @@ -478,8 +464,8 @@ public abstract class BaseCommand implements Command { int transId = msg != null ? msg.getTransactionId() : Integer.MIN_VALUE; logger.warn(String.format( "%s: Unexpected Exception during operation on region: %s key: %s messageId: %s", - new Object[] {serverConnection.getName(), serverConnection.getModRegion(), - serverConnection.getModKey(), transId}), + serverConnection.getName(), serverConnection.getModRegion(), + serverConnection.getModKey(), transId), th); } else { logger.warn(String.format("%s: Unexpected Exception", @@ -652,34 +638,6 @@ public abstract class BaseCommand implements Command { origMsg.clearParts(); } - protected static void writeResponseWithRefreshMetadata(Object data, Object callbackArg, - Message origMsg, boolean isObject, ServerConnection serverConnection, PartitionedRegion pr, - byte nwHop) throws IOException { - Message responseMsg = serverConnection.getResponseMessage(); - responseMsg.setMessageType(MessageType.RESPONSE); - responseMsg.setTransactionId(origMsg.getTransactionId()); - - if (callbackArg == null) { - responseMsg.setNumberOfParts(2); - } else { - responseMsg.setNumberOfParts(3); - } - - if (data instanceof byte[]) { - responseMsg.addRawPart((byte[]) data, isObject); - } else { - Assert.assertTrue(isObject, "isObject should be true when value is not a byte[]"); - responseMsg.addObjPart(data, false); - } - if (callbackArg != null) { - responseMsg.addObjPart(callbackArg); - } - responseMsg.addBytesPart(new byte[] {pr.getMetadataVersion(), nwHop}); - serverConnection.getCache().getCancelCriterion().checkCancelInProgress(null); - responseMsg.send(serverConnection); - origMsg.clearParts(); - } - protected static void writeResponseWithFunctionAttribute(byte[] data, Message origMsg, ServerConnection serverConnection) throws IOException { Message responseMsg = serverConnection.getResponseMessage(); @@ -895,7 +853,7 @@ public abstract class BaseCommand implements Command { ServerConnection servConn) throws IOException { // Client is not interested. if (policy.isNone()) { - sendRegisterInterestResponseChunk(region, riKey, new ArrayList(), true, servConn); + sendRegisterInterestResponseChunk(region, riKey, new ArrayList<>(), true, servConn); return; } if (policy.isKeysValues() && servConn.getClientVersion().isNotOlderThan(KnownVersion.GFE_80)) { @@ -903,7 +861,7 @@ public abstract class BaseCommand implements Command { return; } if (riKey instanceof List) { - handleList(region, (List) riKey, policy, servConn); + handleList(region, (List<?>) riKey, policy, servConn); return; } if (!(riKey instanceof String)) { @@ -947,7 +905,7 @@ public abstract class BaseCommand implements Command { private static void handleKeysValuesPolicy(LocalRegion region, Object riKey, int interestType, boolean serializeValues, ServerConnection servConn) throws IOException { if (riKey instanceof List) { - handleKVList(region, (List) riKey, serializeValues, servConn); + handleKVList(region, (List<?>) riKey, serializeValues, servConn); return; } if (!(riKey instanceof String)) { @@ -990,7 +948,8 @@ public abstract class BaseCommand implements Command { /** * @param list is a List of entry keys */ - private static void sendRegisterInterestResponseChunk(Region region, Object riKey, List list, + private static void sendRegisterInterestResponseChunk(Region<?, ?> region, Object riKey, + List<?> list, boolean lastChunk, ServerConnection servConn) throws IOException { ChunkedMessage chunkedResponseMsg = servConn.getRegisterInterestResponseMessage(); chunkedResponseMsg.setNumberOfParts(1); @@ -1025,14 +984,14 @@ public abstract class BaseCommand implements Command { * @param keyList the list of keys * @param policy the policy */ - private static void handleList(LocalRegion region, List keyList, InterestResultPolicy policy, + private static void handleList(LocalRegion region, List<?> keyList, InterestResultPolicy policy, ServerConnection servConn) throws IOException { if (region instanceof PartitionedRegion) { // too bad java doesn't provide another way to do this... handleListPR((PartitionedRegion) region, keyList, policy, servConn); return; } - List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE); + List<Object> newKeyList = new ArrayList<>(MAXIMUM_CHUNK_SIZE); // Handle list of keys if (region != null) { for (Object entryKey : keyList) { @@ -1064,7 +1023,7 @@ public abstract class BaseCommand implements Command { ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); // From Get70.getValueAndIsObject() Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true); - VersionTag vt = versionHolder.getVersionTag(); + VersionTag<?> vt = versionHolder.getVersionTag(); updateValues(values, entryKey, data, vt); } @@ -1083,7 +1042,7 @@ public abstract class BaseCommand implements Command { */ private static void handleSingleton(LocalRegion region, Object entryKey, InterestResultPolicy policy, ServerConnection servConn) throws IOException { - List keyList = new ArrayList(1); + List<Object> keyList = new ArrayList<>(1); if (region != null) { if (region.containsKey(entryKey) || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) { @@ -1103,7 +1062,7 @@ public abstract class BaseCommand implements Command { */ private static void handleAllKeys(LocalRegion region, InterestResultPolicy policy, ServerConnection servConn) throws IOException { - List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); + List<Object> keyList = new ArrayList<>(MAXIMUM_CHUNK_SIZE); if (region != null) { for (Object entryKey : region.keySet(sendTombstonesInRIResults(servConn, policy))) { appendInterestResponseKey(region, "ALL_KEYS", entryKey, keyList, servConn); @@ -1148,7 +1107,7 @@ public abstract class BaseCommand implements Command { ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); Object data = region.get(key, null, true, true, true, id, versionHolder, true); - VersionTag versionTag = versionHolder.getVersionTag(); + VersionTag<?> versionTag = versionHolder.getVersionTag(); updateValues(values, key, data, versionTag); if (values.size() == MAXIMUM_CHUNK_SIZE) { @@ -1172,8 +1131,8 @@ public abstract class BaseCommand implements Command { region.getConcurrencyChecksEnabled(), serializeValues); if (keyInfo instanceof List) { - HashMap<Integer, HashSet> bucketKeys = new HashMap<>(); - for (Object key : (List) keyInfo) { + HashMap<Integer, HashSet<Object>> bucketKeys = new HashMap<>(); + for (Object key : (List<?>) keyInfo) { int id = PartitionedRegionHelper.getHashKey(region, null, key, null, null); if (bucketKeys.containsKey(id)) { bucketKeys.get(id).add(key); @@ -1199,7 +1158,7 @@ public abstract class BaseCommand implements Command { * of copying it here?) */ private static void updateValues(VersionedObjectList values, Object key, Object value, - VersionTag versionTag) { + VersionTag<?> versionTag) { boolean isObject = true; // If the value in the VM is a CachedDeserializable, @@ -1233,7 +1192,7 @@ public abstract class BaseCommand implements Command { } public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region, - VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn) + VersionedObjectList values, Object riKeys, Set<?> keySet, ServerConnection servConn) throws IOException { ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID(); for (Object key : keySet) { @@ -1254,11 +1213,12 @@ public abstract class BaseCommand implements Command { } public static void appendNewRegisterInterestResponseChunk(LocalRegion region, - VersionedObjectList values, Object riKeys, Set<Map.Entry> set, ServerConnection servConn) + VersionedObjectList values, Object riKeys, Set<Map.Entry<?, ?>> set, + ServerConnection servConn) throws IOException { - for (Entry entry : set) { + for (Entry<?, ?> entry : set) { if (entry instanceof Region.Entry) { // local entries - VersionTag vt; + VersionTag<?> vt; Object key; Object value; if (entry instanceof EntrySnapshot) { @@ -1267,7 +1227,7 @@ public abstract class BaseCommand implements Command { value = ((EntrySnapshot) entry).getRegionEntry().getValue(null); updateValues(values, key, value, vt); } else { - VersionStamp vs = ((NonTXEntry) entry).getRegionEntry().getVersionStamp(); + VersionStamp<?> vs = ((NonTXEntry) entry).getRegionEntry().getVersionStamp(); vt = vs == null ? null : vs.asVersionTag(); key = entry.getKey(); value = ((NonTXEntry) entry).getRegionEntry().getValueRetain(region, true); @@ -1278,9 +1238,9 @@ public abstract class BaseCommand implements Command { } } } else { // Map.Entry (remote entries) - List list = (List) entry.getValue(); + List<?> list = (List<?>) entry.getValue(); Object value = list.get(0); - VersionTag tag = (VersionTag) list.get(1); + VersionTag<?> tag = (VersionTag<?>) list.get(1); updateValues(values, entry.getKey(), value, tag); } if (values.size() == MAXIMUM_CHUNK_SIZE) { @@ -1317,7 +1277,7 @@ public abstract class BaseCommand implements Command { handleRegExPR((PartitionedRegion) region, regex, policy, servConn); return; } - List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); + List<Object> keyList = new ArrayList<>(MAXIMUM_CHUNK_SIZE); // Handle the regex pattern if (region != null) { Pattern keyPattern = Pattern.compile(regex); @@ -1344,14 +1304,10 @@ public abstract class BaseCommand implements Command { */ private static void handleRegExPR(final PartitionedRegion region, final String regex, final InterestResultPolicy policy, final ServerConnection servConn) throws IOException { - final List keyList = new ArrayList(MAXIMUM_CHUNK_SIZE); + final List<Object> keyList = new ArrayList<>(MAXIMUM_CHUNK_SIZE); region.getKeysWithRegEx(regex, sendTombstonesInRIResults(servConn, policy), - new PartitionedRegion.SetCollector() { - @Override - public void receiveSet(Set theSet) throws IOException { - appendInterestResponseKeys(region, regex, theSet, keyList, servConn); - } - }); + theSet -> appendInterestResponseKeys(region, regex, uncheckedCast(theSet), keyList, + servConn)); // Send the last chunk (the only chunk for individual and list keys) // always send it back, even if the list is of zero size. sendRegisterInterestResponseChunk(region, regex, keyList, true, servConn); @@ -1360,22 +1316,18 @@ public abstract class BaseCommand implements Command { /** * Process an interest request involving a list of keys */ - private static void handleListPR(final PartitionedRegion region, final List keyList, + private static void handleListPR(final PartitionedRegion region, final List<?> keyList, final InterestResultPolicy policy, final ServerConnection servConn) throws IOException { - final List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE); + final List<Object> newKeyList = new ArrayList<>(MAXIMUM_CHUNK_SIZE); region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy), - new PartitionedRegion.SetCollector() { - @Override - public void receiveSet(Set theSet) throws IOException { - appendInterestResponseKeys(region, keyList, theSet, newKeyList, servConn); - } - }); + theSet -> appendInterestResponseKeys(region, keyList, uncheckedCast(theSet), newKeyList, + servConn)); // Send the last chunk (the only chunk for individual and list keys) // always send it back, even if the list is of zero size. sendRegisterInterestResponseChunk(region, keyList, newKeyList, true, servConn); } - private static void handleKVList(final LocalRegion region, final List keyList, + private static void handleKVList(final LocalRegion region, final List<?> keyList, boolean serializeValues, final ServerConnection servConn) throws IOException { if (region instanceof PartitionedRegion) { @@ -1394,7 +1346,7 @@ public abstract class BaseCommand implements Command { ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); Object data = region.get(key, null, true, true, true, id, versionHolder, true); - VersionTag versionTag = versionHolder.getVersionTag(); + VersionTag<?> versionTag = versionHolder.getVersionTag(); updateValues(values, key, data, versionTag); if (values.size() == MAXIMUM_CHUNK_SIZE) { @@ -1426,7 +1378,7 @@ public abstract class BaseCommand implements Command { * @param list list to append to */ private static void appendInterestResponseKey(LocalRegion region, Object riKey, Object entryKey, - List list, ServerConnection servConn) throws IOException { + List<Object> list, ServerConnection servConn) throws IOException { list.add(entryKey); if (logger.isDebugEnabled()) { logger.debug("{}: appendInterestResponseKey <{}>; list size was {}; region: {}", @@ -1440,7 +1392,8 @@ public abstract class BaseCommand implements Command { } private static void appendInterestResponseKeys(LocalRegion region, Object riKey, - Collection entryKeys, List collector, ServerConnection servConn) throws IOException { + Collection<Object> entryKeys, List<Object> collector, ServerConnection servConn) + throws IOException { for (final Object entryKey : entryKeys) { appendInterestResponseKey(region, riKey, entryKey, collector, servConn); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java index 895dd22..e538c93 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java @@ -29,7 +29,6 @@ import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.Token; import org.apache.geode.internal.cache.VersionTagHolder; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -62,27 +61,22 @@ public class Get70 extends BaseCommand { public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, final SecurityService securityService, long startparam) throws IOException { long start = startparam; - Part regionNamePart = null, keyPart = null, valuePart = null; - String regionName = null; - Object callbackArg = null, key = null; - CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); - CacheServerStats stats = serverConnection.getCacheServerStats(); - String errMessage = null; + final CacheServerStats stats = serverConnection.getCacheServerStats(); serverConnection.setAsTrue(REQUIRES_RESPONSE); - // requiresResponse = true; { long oldStart = start; start = DistributionStats.getStatTime(); stats.incReadGetRequestTime(start - oldStart); } // Retrieve the data from the message parts - int parts = clientMessage.getNumberOfParts(); - regionNamePart = clientMessage.getPart(0); - keyPart = clientMessage.getPart(1); + final int parts = clientMessage.getNumberOfParts(); + final Part regionNamePart = clientMessage.getPart(0); + final Part keyPart = clientMessage.getPart(1); // valuePart = null; (redundant assignment) + Object callbackArg = null; if (parts > 2) { - valuePart = clientMessage.getPart(2); + Part valuePart = clientMessage.getPart(2); try { callbackArg = valuePart.getObject(); } catch (Exception e) { @@ -92,7 +86,8 @@ public class Get70 extends BaseCommand { return; } } - regionName = regionNamePart.getCachedString(); + final String regionName = regionNamePart.getCachedString(); + final Object key; try { key = keyPart.getStringOrObject(); } catch (Exception e) { @@ -109,12 +104,13 @@ public class Get70 extends BaseCommand { // Process the get request if (key == null || regionName == null) { + final String errMessage; if ((key == null) && (regionName == null)) { errMessage = "The input region name and key for the get request are null."; } else if (key == null) { errMessage = "The input key for the get request is null."; - } else if (regionName == null) { + } else { errMessage = "The input region name for the get request is null."; } logger.warn("{}: {}", serverConnection.getName(), errMessage); @@ -123,7 +119,7 @@ public class Get70 extends BaseCommand { return; } - Region region = serverConnection.getCache().getRegion(regionName); + final Region<?, ?> region = serverConnection.getCache().getRegion(regionName); if (region == null) { String reason = " was not found during get request"; writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection); @@ -163,8 +159,8 @@ public class Get70 extends BaseCommand { Object data = originalData; try { boolean isObject = entry.isObject; - VersionTag versionTag = entry.versionTag; - boolean keyNotPresent = entry.keyNotPresent; + final VersionTag<?> versionTag = entry.versionTag; + final boolean keyNotPresent = entry.keyNotPresent; try { AuthorizeRequestPP postAuthzRequest = serverConnection.getPostAuthzRequest(); @@ -234,7 +230,7 @@ public class Get70 extends BaseCommand { * code needs to call getValueAndIsObject then this method can go away. */ @Retained - protected Entry getEntry(Region region, Object key, Object callbackArg, + protected Entry getEntry(Region<?, ?> region, Object key, Object callbackArg, ServerConnection servConn) { return getEntryRetained(region, key, callbackArg, servConn); } @@ -243,7 +239,7 @@ public class Get70 extends BaseCommand { // returning as the result to avoid creating the array repeatedly // for large number of entries like in getAll. Third element added in // 7.0 for retrieving version information - public Entry getValueAndIsObject(Region region, Object key, Object callbackArg, + public Entry getValueAndIsObject(Region<?, ?> region, Object key, Object callbackArg, ServerConnection servConn) { // Region.Entry entry; @@ -251,40 +247,12 @@ public class Get70 extends BaseCommand { if (servConn != null) { servConn.setModificationInfo(true, regionName, key); } - VersionTag versionTag = null; - // LocalRegion lregion = (LocalRegion)region; - - // entry = lregion.getEntry(key, true); - boolean isObject = true; - Object data = null; - - - // if (entry != null && region.getAttributes().getConcurrencyChecksEnabled()) { - // RegionEntry re; - // if (entry instanceof NonTXEntry) { - // re = ((NonTXEntry)entry).getRegionEntry(); - // } else if (entry instanceof EntrySnapshot) { - // re = ((EntrySnapshot)entry).getRegionEntry(); - // } else if (entry instanceof TXEntry) { - // re = null; // versioning not supported in tx yet - // data = entry.getValue(); // can I get a serialized form?? - // } else { - // re = (RegionEntry)entry; - // } - // if (re != null) { - // data = re.getValueInVM(); - // VersionStamp stamp = re.getVersionStamp(); - // if (stamp != null) { - // versionHolder.setVersionTag(stamp.asVersionTag()); - // } - // } - // } else { - ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); - VersionTagHolder versionHolder = new VersionTagHolder(); - data = ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true); - // } - versionTag = versionHolder.getVersionTag(); + final ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); + final VersionTagHolder versionHolder = new VersionTagHolder(); + Object data = + ((LocalRegion) region).get(key, callbackArg, true, true, true, id, versionHolder, true); + final VersionTag<?> versionTag = versionHolder.getVersionTag(); // If the value in the VM is a CachedDeserializable, // get its value. If it is Token.REMOVED, Token.DESTROYED, @@ -292,6 +260,7 @@ public class Get70 extends BaseCommand { // set it to null. If it is NOT_AVAILABLE, get the value from // disk. If it is already a byte[], set isObject to false. boolean wasInvalid = false; + boolean isObject = true; if (data instanceof CachedDeserializable) { CachedDeserializable cd = (CachedDeserializable) data; if (!cd.isSerialized()) { @@ -318,7 +287,7 @@ public class Get70 extends BaseCommand { * Same as getValueAndIsObject but the returned value can be a retained off-heap reference. */ @Retained - public Entry getEntryRetained(Region region, Object key, Object callbackArg, + public Entry getEntryRetained(Region<?, ?> region, Object key, Object callbackArg, ServerConnection servConn) { // Region.Entry entry; @@ -326,26 +295,21 @@ public class Get70 extends BaseCommand { if (servConn != null) { servConn.setModificationInfo(true, regionName, key); } - VersionTag versionTag = null; - // LocalRegion lregion = (LocalRegion)region; - - // entry = lregion.getEntry(key, true); - - boolean isObject = true; - @Retained - Object data = null; ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID(); VersionTagHolder versionHolder = new VersionTagHolder(); - data = + + @Retained + Object data = ((LocalRegion) region).getRetained(key, callbackArg, true, true, id, versionHolder, true); - versionTag = versionHolder.getVersionTag(); + final VersionTag<?> versionTag = versionHolder.getVersionTag(); // If it is Token.REMOVED, Token.DESTROYED, // Token.INVALID, or Token.LOCAL_INVALID // set it to null. If it is NOT_AVAILABLE, get the value from // disk. If it is already a byte[], set isObject to false. boolean wasInvalid = false; + boolean isObject = true; if (data == Token.REMOVED_PHASE1 || data == Token.REMOVED_PHASE2 || data == Token.DESTROYED) { data = null; } else if (data == Token.INVALID || data == Token.LOCAL_INVALID) { @@ -369,9 +333,9 @@ public class Get70 extends BaseCommand { public final Object value; public final boolean isObject; public final boolean keyNotPresent; - public final VersionTag versionTag; + public final VersionTag<?> versionTag; - public Entry(Object value, boolean isObject, boolean keyNotPresent, VersionTag versionTag) { + public Entry(Object value, boolean isObject, boolean keyNotPresent, VersionTag<?> versionTag) { this.value = value; this.isObject = isObject; this.keyNotPresent = keyNotPresent; @@ -390,14 +354,8 @@ public class Get70 extends BaseCommand { throw new UnsupportedOperationException(); } - @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, - PartitionedRegion pr, byte nwHop) throws IOException { - throw new UnsupportedOperationException(); - } - private void writeResponse(@Unretained Object data, Object callbackArg, Message origMsg, - boolean isObject, VersionTag versionTag, boolean keyNotPresent, ServerConnection servConn) + boolean isObject, VersionTag<?> versionTag, boolean keyNotPresent, ServerConnection servConn) throws IOException { Message responseMsg = servConn.getResponseMessage(); responseMsg.setMessageType(MessageType.RESPONSE); @@ -439,14 +397,9 @@ public class Get70 extends BaseCommand { origMsg.clearParts(); } - protected static void writeResponse(Object data, Object callbackArg, Message origMsg, - boolean isObject, ServerConnection servConn) throws IOException { - throw new UnsupportedOperationException(); - } - private void writeResponseWithRefreshMetadata(@Unretained Object data, Object callbackArg, Message origMsg, boolean isObject, ServerConnection servConn, PartitionedRegion pr, - byte nwHop, VersionTag versionTag, boolean keyNotPresent) throws IOException { + byte nwHop, VersionTag<?> versionTag, boolean keyNotPresent) throws IOException { Message responseMsg = servConn.getResponseMessage(); responseMsg.setMessageType(MessageType.RESPONSE); responseMsg.setTransactionId(origMsg.getTransactionId()); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java index 247c95a..e6d0a4c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/PutAll80.java @@ -14,6 +14,8 @@ */ package org.apache.geode.internal.cache.tier.sockets.command; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.LinkedHashMap; @@ -78,13 +80,6 @@ public class PutAll80 extends BaseCommand { public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, final SecurityService securityService, long startp) throws IOException, InterruptedException { long start = startp; // copy this since we need to modify it - Part regionNamePart = null, numberOfKeysPart = null, keyPart = null, valuePart = null; - String regionName = null; - int numberOfKeys = 0; - Object key = null; - Part eventPart = null; - boolean replyWithMetaData = false; - VersionedObjectList response = null; StringBuilder errMessage = new StringBuilder(); CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); @@ -99,10 +94,13 @@ public class PutAll80 extends BaseCommand { stats.incReadPutAllRequestTime(start - oldStart); } + final String regionName; + boolean replyWithMetaData = false; + VersionedObjectList response; try { // Retrieve the data from the message parts // part 0: region name - regionNamePart = clientMessage.getPart(0); + Part regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getCachedString(); if (regionName == null) { @@ -127,7 +125,7 @@ public class PutAll80 extends BaseCommand { final int BASE_PART_COUNT = getBasePartCount(); // part 1: eventID - eventPart = clientMessage.getPart(1); + Part eventPart = clientMessage.getPart(1); ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); @@ -138,7 +136,7 @@ public class PutAll80 extends BaseCommand { // part 2: invoke callbacks (used by import) Part callbacksPart = clientMessage.getPart(2); - boolean skipCallbacks = callbacksPart.getInt() == 1 ? true : false; + boolean skipCallbacks = callbacksPart.getInt() == 1; // part 3: flags int flags = clientMessage.getPart(3).getInt(); @@ -146,26 +144,27 @@ public class PutAll80 extends BaseCommand { boolean clientHasCCEnabled = (flags & PutAllOp.FLAG_CONCURRENCY_CHECKS) != 0; // part 4: number of keys - numberOfKeysPart = clientMessage.getPart(4); - numberOfKeys = numberOfKeysPart.getInt(); + Part numberOfKeysPart = clientMessage.getPart(4); + int numberOfKeys = numberOfKeysPart.getInt(); Object callbackArg = getOptionalCallbackArg(clientMessage); if (logger.isDebugEnabled()) { - StringBuilder buffer = new StringBuilder(); - buffer.append(serverConnection.getName()).append(": Received ") - .append(this.putAllClassName()).append(" request from ") - .append(serverConnection.getSocketString()).append(" for region ").append(regionName) - .append(callbackArg != null ? (" callbackArg " + callbackArg) : "").append(" with ") - .append(numberOfKeys).append(" entries."); - logger.debug(buffer.toString()); + final String buffer = serverConnection.getName() + ": Received " + + putAllClassName() + " request from " + + serverConnection.getSocketString() + " for region " + regionName + + (callbackArg != null ? (" callbackArg " + callbackArg) : "") + " with " + + numberOfKeys + " entries."; + logger.debug(buffer); } // building the map - Map map = new LinkedHashMap(); - Map<Object, VersionTag> retryVersions = new LinkedHashMap<Object, VersionTag>(); + Map<Object, Object> map = new LinkedHashMap<>(); + final Map<Object, VersionTag<?>> retryVersions = new LinkedHashMap<>(); // Map isObjectMap = new LinkedHashMap(); + Part valuePart; + Object key; for (int i = 0; i < numberOfKeys; i++) { - keyPart = clientMessage.getPart(BASE_PART_COUNT + i * 2); + Part keyPart = clientMessage.getPart(BASE_PART_COUNT + i * 2); key = keyPart.getStringOrObject(); if (key == null) { String putAllMsg = @@ -224,7 +223,7 @@ public class PutAll80 extends BaseCommand { entryEventId.getSequenceID()); } - VersionTag tag = findVersionTagsForRetriedBulkOp(region, entryEventId); + VersionTag<?> tag = findVersionTagsForRetriedBulkOp(region, entryEventId); if (tag != null) { retryVersions.put(key, tag); } @@ -254,23 +253,15 @@ public class PutAll80 extends BaseCommand { authzRequest.putAllAuthorize(regionName, map, callbackArg); map = putAllContext.getMap(); if (map instanceof UpdateOnlyMap) { - map = ((UpdateOnlyMap) map).getInternalMap(); + map = uncheckedCast(((UpdateOnlyMap) map).getInternalMap()); } callbackArg = putAllContext.getCallbackArg(); } - } else { - // no auth, so update the map based on isObjectMap here - /* - * Collection entries = map.entrySet(); Iterator iterator = entries.iterator(); Map.Entry - * mapEntry = null; while (iterator.hasNext()) { mapEntry = (Map.Entry)iterator.next(); - * Object currkey = mapEntry.getKey(); byte[] serializedValue = (byte[])mapEntry.getValue(); - * boolean isObject = ((Boolean)isObjectMap.get(currkey)).booleanValue(); if (isObject) { - * map.put(currkey, CachedDeserializableFactory.create(serializedValue)); } } - */ } - response = region.basicBridgePutAll(map, retryVersions, serverConnection.getProxyID(), - eventId, skipCallbacks, callbackArg); + response = + region.basicBridgePutAll(map, uncheckedCast(retryVersions), serverConnection.getProxyID(), + eventId, skipCallbacks, callbackArg); if (!region.getConcurrencyChecksEnabled() || clientIsEmpty || !clientHasCCEnabled) { // the client only needs this if versioning is being used and the client // has storage @@ -291,18 +282,10 @@ public class PutAll80 extends BaseCommand { replyWithMetaData = true; } } - } catch (RegionDestroyedException rde) { + } catch (RegionDestroyedException | ResourceException | PutAllPartialResultException rde) { writeChunkedException(clientMessage, rde, serverConnection); serverConnection.setAsTrue(RESPONDED); return; - } catch (ResourceException re) { - writeChunkedException(clientMessage, re, serverConnection); - serverConnection.setAsTrue(RESPONDED); - return; - } catch (PutAllPartialResultException pre) { - writeChunkedException(clientMessage, pre, serverConnection); - serverConnection.setAsTrue(RESPONDED); - return; } catch (Exception ce) { // If an interrupted exception is thrown , rethrow it checkForInterrupt(serverConnection, ce); @@ -385,12 +368,6 @@ public class PutAll80 extends BaseCommand { } } - @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, - PartitionedRegion pr, byte nwHop) throws IOException { - throw new UnsupportedOperationException(); - } - private void writeReplyWithRefreshMetadata(Message origMsg, VersionedObjectList response, ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException { servConn.getCache().getCancelCriterion().checkCancelInProgress(null); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java index 1f0ea4a..c4b74a3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/RemoveAll.java @@ -14,6 +14,8 @@ */ package org.apache.geode.internal.cache.tier.sockets.command; +import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -30,7 +32,6 @@ import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PutAllPartialResultException; import org.apache.geode.internal.cache.ha.ThreadIdentifier; -import org.apache.geode.internal.cache.tier.CachedRegionHelper; import org.apache.geode.internal.cache.tier.Command; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.tier.sockets.BaseCommand; @@ -62,17 +63,8 @@ public class RemoveAll extends BaseCommand { public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection, final SecurityService securityService, long startp) throws IOException, InterruptedException { long start = startp; // copy this since we need to modify it - Part regionNamePart = null, numberOfKeysPart = null, keyPart = null; - String regionName = null; - int numberOfKeys = 0; - Object key = null; - Part eventPart = null; - boolean replyWithMetaData = false; - VersionedObjectList response = null; - StringBuilder errMessage = new StringBuilder(); - CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper(); - CacheServerStats stats = serverConnection.getCacheServerStats(); + final CacheServerStats stats = serverConnection.getCacheServerStats(); serverConnection.setAsTrue(REQUIRES_RESPONSE); serverConnection.setAsTrue(REQUIRES_CHUNKED_RESPONSE); @@ -82,12 +74,16 @@ public class RemoveAll extends BaseCommand { stats.incReadRemoveAllRequestTime(start - oldStart); } + final String regionName; + VersionedObjectList response; + boolean replyWithMetaData = false; try { // Retrieve the data from the message parts // part 0: region name - regionNamePart = clientMessage.getPart(0); + Part regionNamePart = clientMessage.getPart(0); regionName = regionNamePart.getCachedString(); + StringBuilder errMessage = new StringBuilder(); if (regionName == null) { String txt = "The input region name for the removeAll request is null"; @@ -108,7 +104,7 @@ public class RemoveAll extends BaseCommand { } // part 1: eventID - eventPart = clientMessage.getPart(1); + Part eventPart = clientMessage.getPart(1); ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm()); long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer); @@ -126,8 +122,8 @@ public class RemoveAll extends BaseCommand { Object callbackArg = clientMessage.getPart(3).getObject(); // part 4: number of keys - numberOfKeysPart = clientMessage.getPart(4); - numberOfKeys = numberOfKeysPart.getInt(); + final Part numberOfKeysPart = clientMessage.getPart(4); + final int numberOfKeys = numberOfKeysPart.getInt(); if (logger.isDebugEnabled()) { StringBuilder buffer = new StringBuilder(); @@ -137,10 +133,11 @@ public class RemoveAll extends BaseCommand { .append(numberOfKeys).append(" keys."); logger.debug(buffer); } - ArrayList<Object> keys = new ArrayList<Object>(numberOfKeys); - ArrayList<VersionTag> retryVersions = new ArrayList<VersionTag>(numberOfKeys); + final ArrayList<Object> keys = new ArrayList<>(numberOfKeys); + final ArrayList<VersionTag<?>> retryVersions = new ArrayList<>(numberOfKeys); + Object key; for (int i = 0; i < numberOfKeys; i++) { - keyPart = clientMessage.getPart(5 + i); + final Part keyPart = clientMessage.getPart(5 + i); key = keyPart.getStringOrObject(); if (key == null) { String txt = @@ -169,7 +166,7 @@ public class RemoveAll extends BaseCommand { entryEventId.getSequenceID()); } - VersionTag tag = findVersionTagsForRetriedBulkOp(region, entryEventId); + VersionTag<?> tag = findVersionTagsForRetriedBulkOp(region, entryEventId); retryVersions.add(tag); // FIND THE VERSION TAG FOR THIS KEY - but how? all we have is the // removeAll eventId, not individual eventIds for entries, right? @@ -199,7 +196,8 @@ public class RemoveAll extends BaseCommand { } } - response = region.basicBridgeRemoveAll(keys, retryVersions, serverConnection.getProxyID(), + response = region.basicBridgeRemoveAll(keys, uncheckedCast(retryVersions), + serverConnection.getProxyID(), eventId, callbackArg); if (!region.getConcurrencyChecksEnabled() || clientIsEmpty || !clientHasCCEnabled) { // the client only needs this if versioning is being used and the client @@ -221,18 +219,10 @@ public class RemoveAll extends BaseCommand { replyWithMetaData = true; } } - } catch (RegionDestroyedException rde) { + } catch (RegionDestroyedException | ResourceException | PutAllPartialResultException rde) { writeChunkedException(clientMessage, rde, serverConnection); serverConnection.setAsTrue(RESPONDED); return; - } catch (ResourceException re) { - writeChunkedException(clientMessage, re, serverConnection); - serverConnection.setAsTrue(RESPONDED); - return; - } catch (PutAllPartialResultException pre) { - writeChunkedException(clientMessage, pre, serverConnection); - serverConnection.setAsTrue(RESPONDED); - return; } catch (Exception ce) { // If an interrupted exception is thrown , rethrow it checkForInterrupt(serverConnection, ce); @@ -317,12 +307,6 @@ public class RemoveAll extends BaseCommand { } } - @Override - protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection, - PartitionedRegion pr, byte nwHop) throws IOException { - throw new UnsupportedOperationException(); - } - private void writeReplyWithRefreshMetadata(Message origMsg, VersionedObjectList response, ServerConnection servConn, PartitionedRegion pr, byte nwHop) throws IOException { servConn.getCache().getCancelCriterion().checkCancelInProgress(null);