This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit b92a0f6bc4d606193407c3c3ad9b085e28ce3685 Author: Jacob Barrett <jbarr...@pivotal.io> AuthorDate: Thu May 20 14:53:09 2021 -0700 GEODE-6588: Cleanup GatewayReceiverCommand --- .../sockets/command/GatewayReceiverCommand.java | 95 ++++++---------- .../internal/cache/wan/GatewayReceiverStats.java | 124 ++++++++------------- 2 files changed, 85 insertions(+), 134 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index 861e71e..1a15cd3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -14,6 +14,8 @@ */ package org.apache.geode.internal.cache.tier.sockets.command; +import static java.lang.String.format; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -70,7 +72,7 @@ public class GatewayReceiverCommand extends BaseCommand { if (cache != null && cache.isCacheAtShutdownAll()) { throw cache.getCacheClosedException("Shutdown occurred during message processing"); } - String reason = String.format("Region %s was not found during batch create request %s", + String reason = format("Region %s was not found during batch create request %s", regionName, batchId); throw new RegionDestroyedException(reason, regionName); } @@ -171,7 +173,7 @@ public class GatewayReceiverCommand extends BaseCommand { try { possibleDuplicatePartBytes = (byte[]) possibleDuplicatePart.getObject(); } catch (Exception e) { - logger.warn(String.format( + logger.warn(format( "%s: Caught exception processing batch request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); handleException(removeOnException, stats, e); @@ -179,14 +181,9 @@ public class GatewayReceiverCommand extends BaseCommand { } boolean possibleDuplicate = possibleDuplicatePartBytes[0] == 0x01; - // Make sure instance variables are null before each iteration - String regionName = null; - Object key = null; - Object callbackArg = null; - // Retrieve the region name from the message parts Part regionNamePart = clientMessage.getPart(partNumber + 2); - regionName = regionNamePart.getCachedString(); + String regionName = regionNamePart.getCachedString(); if (regionName.equals(PeerTypeRegistration.REGION_FULL_PATH)) { indexWithoutPDXEvent--; isPdxEvent = true; @@ -204,7 +201,7 @@ public class GatewayReceiverCommand extends BaseCommand { try { eventId = (EventID) eventIdPart.getObject(); } catch (Exception e) { - logger.warn(String.format( + logger.warn(format( "%s: Caught exception processing batch request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); handleException(removeOnException, stats, e); @@ -213,10 +210,11 @@ public class GatewayReceiverCommand extends BaseCommand { // Retrieve the key from the message parts Part keyPart = clientMessage.getPart(partNumber + 4); + Object key; try { key = keyPart.getStringOrObject(); } catch (Exception e) { - logger.warn(String.format( + logger.warn(format( "%s: Caught exception processing batch request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); handleException(removeOnException, stats, e); @@ -228,24 +226,12 @@ public class GatewayReceiverCommand extends BaseCommand { long versionTimeStamp; Part callbackArgExistsPart; LocalRegion region; + Object callbackArg = null; switch (actionType) { case 0: // Create try { - - /* - * CLIENT EXCEPTION HANDLING TESTING CODE String keySt = (String) key; - * System.out.println("Processing new key: " + key); if - * (keySt.startsWith("failure")) { throw new Exception(LocalizedStrings - * .ProcessBatch_THIS_EXCEPTION_REPRESENTS_A_FAILURE_ON_THE_SERVER - * )); } - */ - // Retrieve the value from the message parts (do not deserialize it) valuePart = clientMessage.getPart(partNumber + 5); - // try { - // logger.warn(getName() + ": Creating key " + key + " value " + - // valuePart.getObject()); - // } catch (Exception e) {} // Retrieve the callbackArg from the message parts if necessary index = partNumber + 6; @@ -260,7 +246,7 @@ public class GatewayReceiverCommand extends BaseCommand { callbackArg = callbackArgPart.getObject(); } catch (Exception e) { logger - .warn(String.format( + .warn(format( "%s: Caught exception processing batch create request %s for %s events", serverConnection.getName(), batchId, numberOfEvents), e); @@ -283,7 +269,7 @@ public class GatewayReceiverCommand extends BaseCommand { if (regionName == null) { message = "%s: The input region name for the batch create request %s is null"; } - String s = String.format(message, serverConnection.getName(), batchId); + String s = format(message, serverConnection.getName(), batchId); logger.warn(s); throw new Exception(s); } @@ -303,8 +289,7 @@ public class GatewayReceiverCommand extends BaseCommand { handleMessageRetry(region, clientEvent); byte[] value = valuePart.getSerializedForm(); boolean isObject = valuePart.isObject(); - // [sumedh] This should be done on client while sending - // since that is the WAN gateway + // This should be done on client while sending since that is the WAN gateway AuthorizeRequest authzRequest = serverConnection.getAuthzRequest(); if (authzRequest != null) { PutOperationContext putContext = @@ -313,7 +298,7 @@ public class GatewayReceiverCommand extends BaseCommand { isObject = putContext.isObject(); } // Attempt to create the entry - boolean result = false; + boolean result; if (isPdxEvent) { result = addPdxType(crHelper, key, value); } else { @@ -334,13 +319,13 @@ public class GatewayReceiverCommand extends BaseCommand { } else { // This exception will be logged in the catch block below throw new Exception( - String.format( + format( "%s: Failed to create or update entry for region %s key %s value %s callbackArg %s", serverConnection.getName(), regionName, key, valuePart, callbackArg)); } } } catch (Exception e) { - logger.warn(String.format( + logger.warn(format( "%s: Caught exception processing batch create request %s for %s events", serverConnection.getName(), batchId, numberOfEvents), e); handleException(removeOnException, stats, e); @@ -351,10 +336,6 @@ public class GatewayReceiverCommand extends BaseCommand { try { // Retrieve the value from the message parts (do not deserialize it) valuePart = clientMessage.getPart(partNumber + 5); - // try { - // logger.warn(getName() + ": Updating key " + key + " value " + - // valuePart.getObject()); - // } catch (Exception e) {} // Retrieve the callbackArg from the message parts if necessary index = partNumber + 6; @@ -370,7 +351,7 @@ public class GatewayReceiverCommand extends BaseCommand { } catch (Exception e) { logger .warn( - String.format( + format( "%s: Caught exception processing batch update request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); @@ -393,7 +374,7 @@ public class GatewayReceiverCommand extends BaseCommand { if (regionName == null) { message = "%s: The input region name for the batch update request %s is null"; } - String s = String.format(message, serverConnection.getName(), batchId); + String s = format(message, serverConnection.getName(), batchId); logger.warn(s); throw new Exception(s); } @@ -420,7 +401,7 @@ public class GatewayReceiverCommand extends BaseCommand { value = putContext.getSerializedValue(); isObject = putContext.isObject(); } - boolean result = false; + final boolean result; if (isPdxEvent) { result = addPdxType(crHelper, key, value); } else { @@ -434,7 +415,7 @@ public class GatewayReceiverCommand extends BaseCommand { } else { final String message = "%s: Failed to update entry for region %s, key %s, value %s, and callbackArg %s"; - String s = String.format(message, serverConnection.getName(), regionName, + String s = format(message, serverConnection.getName(), regionName, key, valuePart, callbackArg); logger.info(s); throw new Exception(s); @@ -442,7 +423,7 @@ public class GatewayReceiverCommand extends BaseCommand { } } catch (Exception e) { // Preserve the connection under all circumstances - logger.warn(String.format( + logger.warn(format( "%s: Caught exception processing batch update request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); handleException(removeOnException, stats, e); @@ -465,7 +446,7 @@ public class GatewayReceiverCommand extends BaseCommand { } catch (Exception e) { logger .warn( - String.format( + format( "%s: Caught exception processing batch destroy request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); @@ -491,7 +472,7 @@ public class GatewayReceiverCommand extends BaseCommand { message = "%s: The input region name for the batch destroy request %s is null"; } - String s = String.format(message, serverConnection.getName(), batchId); + String s = format(message, serverConnection.getName(), batchId); logger.warn(s); throw new Exception(s); } @@ -527,7 +508,7 @@ public class GatewayReceiverCommand extends BaseCommand { retry = false; } } catch (Exception e) { - logger.warn(String.format( + logger.warn(format( "%s: Caught exception processing batch destroy request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); @@ -573,7 +554,7 @@ public class GatewayReceiverCommand extends BaseCommand { String message = "%s: Caught exception processing batch update version request request %s containing %s events"; - String s = String.format(message, serverConnection.getName(), + String s = format(message, serverConnection.getName(), batchId, numberOfEvents); logger.warn(s); throw new Exception(s); @@ -608,7 +589,7 @@ public class GatewayReceiverCommand extends BaseCommand { } } } catch (Exception e) { - logger.warn(String.format( + logger.warn(format( "%s: Caught exception processing batch update version request request %s containing %s events", serverConnection.getName(), batchId, numberOfEvents), e); handleException(removeOnException, stats, e); @@ -638,7 +619,7 @@ public class GatewayReceiverCommand extends BaseCommand { // If we have an issue with the PDX registry, stop processing more data if (e.getCause() instanceof PdxRegistryMismatchException) { fatalException = e.getCause(); - logger.fatal(String.format( + logger.fatal(format( "This gateway receiver has received a PDX type from %s that does match the existing PDX type. This gateway receiver will not process any more events, in order to prevent receiving objects which may not be deserializable.", serverConnection.getMembershipID()), e.getCause()); break; @@ -647,7 +628,7 @@ public class GatewayReceiverCommand extends BaseCommand { // Increment the batch id unless the received batch id is -1 (a // failover batch) DistributedSystem ds = crHelper.getCacheForGatewayCommand().getDistributedSystem(); - String exceptionMessage = String.format( + String exceptionMessage = format( "Exception occurred while processing a batch on the receiver running on DistributedSystem with Id: %s, DistributedMember on which the receiver is running: %s", ((InternalDistributedSystem) ds).getDistributionManager().getDistributedSystemId(), ds.getDistributedMember()); @@ -685,11 +666,11 @@ public class GatewayReceiverCommand extends BaseCommand { } if (fatalException != null) { serverConnection.incrementLatestBatchIdReplied(batchId); - writeFatalException(clientMessage, fatalException, serverConnection, batchId); + writeFatalException(clientMessage, fatalException, serverConnection); serverConnection.setAsTrue(RESPONDED); } else if (!exceptions.isEmpty()) { serverConnection.incrementLatestBatchIdReplied(batchId); - writeBatchException(clientMessage, exceptions, serverConnection, batchId); + writeBatchException(clientMessage, exceptions, serverConnection); serverConnection.setAsTrue(RESPONDED); } else { // Increment the batch id unless the received batch id is -1 (a failover @@ -725,7 +706,7 @@ public class GatewayReceiverCommand extends BaseCommand { private void handleException(boolean removeOnException, GatewayReceiverStats stats, Exception e) throws Exception { - if (shouldThrowException(removeOnException, e)) { + if (shouldThrowException(removeOnException)) { throw e; } else { stats.incEventsRetried(); @@ -733,9 +714,9 @@ public class GatewayReceiverCommand extends BaseCommand { } } - private boolean shouldThrowException(boolean removeOnException, Exception e) { + private boolean shouldThrowException(boolean removeOnException) { // Split out in case specific exceptions would short-circuit retry logic. - // Currently it just considers the boolean. + // Currently, it just considers the boolean. return removeOnException; } @@ -770,18 +751,15 @@ public class GatewayReceiverCommand extends BaseCommand { } private static void writeBatchException(Message origMsg, List<BatchException70> exceptions, - ServerConnection servConn, int batchId) throws IOException { + ServerConnection servConn) throws IOException { Message errorMsg = servConn.getErrorResponseMessage(); errorMsg.setMessageType(MessageType.EXCEPTION); errorMsg.setNumberOfParts(2); errorMsg.setTransactionId(origMsg.getTransactionId()); - errorMsg.addObjPart(exceptions); - // errorMsg.addStringPart(be.toString()); errorMsg.send(servConn); - for (Exception e : exceptions) { - ((GatewayReceiverStats) servConn.getCacheServerStats()).incExceptionsOccurred(); - } + ((GatewayReceiverStats) servConn.getCacheServerStats()) + .incExceptionsOccurred(exceptions.size()); for (Exception be : exceptions) { if (logger.isWarnEnabled()) { logger.warn(servConn.getName() + ": Wrote batch exception: ", @@ -791,13 +769,12 @@ public class GatewayReceiverCommand extends BaseCommand { } private static void writeFatalException(Message origMsg, Throwable exception, - ServerConnection servConn, int batchId) throws IOException { + ServerConnection servConn) throws IOException { Message errorMsg = servConn.getErrorResponseMessage(); errorMsg.setMessageType(MessageType.EXCEPTION); errorMsg.setNumberOfParts(2); errorMsg.setTransactionId(origMsg.getTransactionId()); errorMsg.addObjPart(exception); - // errorMsg.addStringPart(be.toString()); errorMsg.send(servConn); logger.warn(servConn.getName() + ": Wrote batch exception: ", exception); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java index 4184b16..73af7b0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverStats.java @@ -27,11 +27,6 @@ public class GatewayReceiverStats extends CacheServerStats { private static final String typeName = "GatewayReceiverStatistics"; - // ////////////////// Statistic "Id" Fields //////////////////// - - // /** Name of the events queued statistic */ - // private static final String FAILOVER_BATCHES_RECEIVED = "failoverBatchesReceived"; - /** * Name of the events not queued because conflated statistic */ @@ -89,22 +84,18 @@ public class GatewayReceiverStats extends CacheServerStats { /** * Id of the events not queued because conflated statistic */ - private int duplicateBatchesReceivedId; + private final int duplicateBatchesReceivedId; /** * Id of the event queue time statistic */ - private int outoforderBatchesReceivedId; + private final int outoforderBatchesReceivedId; /** * Id of the event queue size statistic */ - private int earlyAcksId; + private final int earlyAcksId; - /** - * Id of the events distributed statistic - */ - private int eventsReceivedId; private final Counter eventsReceivedCounter; private static final String EVENTS_RECEIVED_COUNTER_NAME = "geode.gateway.receiver.events"; @@ -115,59 +106,59 @@ public class GatewayReceiverStats extends CacheServerStats { /** * Id of the events exceeding alert threshold statistic */ - private int createRequestId; + private final int createRequestId; /** * Id of the batch distribution time statistic */ - private int updateRequestId; + private final int updateRequestId; /** * Id of the batches distributed statistic */ - private int destroyRequestId; + private final int destroyRequestId; /** * Id of the batches redistributed statistic */ - private int unknowsOperationsReceivedId; + private final int unknowsOperationsReceivedId; /** * Id of the unprocessed events added by primary statistic */ - private int exceptionsOccurredId; + private final int exceptionsOccurredId; /** * Id of the events retried statistic */ - private int eventsRetriedId; + private final int eventsRetriedId; // ///////////////////// Constructors /////////////////////// public static GatewayReceiverStats createGatewayReceiverStats(StatisticsFactory f, String ownerName, MeterRegistry meterRegistry) { StatisticDescriptor[] descriptors = new StatisticDescriptor[] { - f.createIntCounter(DUPLICATE_BATCHES_RECEIVED, + f.createLongCounter(DUPLICATE_BATCHES_RECEIVED, "number of batches which have already been seen by this GatewayReceiver", "nanoseconds"), - f.createIntCounter(OUT_OF_ORDER_BATCHES_RECEIVED, + f.createLongCounter(OUT_OF_ORDER_BATCHES_RECEIVED, "number of batches which are out of order on this GatewayReceiver", "operations"), - f.createIntCounter(EARLY_ACKS, "number of early acknowledgements sent to gatewaySenders", + f.createLongCounter(EARLY_ACKS, "number of early acknowledgements sent to gatewaySenders", "operations"), f.createLongCounter(EVENTS_RECEIVED, EVENTS_RECEIVED_COUNTER_DESCRIPTION, EVENTS_RECEIVED_COUNTER_UNITS), - f.createIntCounter(CREAT_REQUESTS, + f.createLongCounter(CREAT_REQUESTS, "total number of create operations received by this GatewayReceiver", "operations"), - f.createIntCounter(UPDATE_REQUESTS, + f.createLongCounter(UPDATE_REQUESTS, "total number of update operations received by this GatewayReceiver", "operations"), - f.createIntCounter(DESTROY_REQUESTS, + f.createLongCounter(DESTROY_REQUESTS, "total number of destroy operations received by this GatewayReceiver", "operations"), - f.createIntCounter(UNKNOWN_OPERATIONS_RECEIVED, + f.createLongCounter(UNKNOWN_OPERATIONS_RECEIVED, "total number of unknown operations received by this GatewayReceiver", "operations"), - f.createIntCounter(EXCEPTIONS_OCCURRED, + f.createLongCounter(EXCEPTIONS_OCCURRED, "number of exceptions occurred while porcessing the batches", "operations"), - f.createIntCounter(EVENTS_RETRIED, + f.createLongCounter(EVENTS_RETRIED, "total number events retried by this GatewayReceiver due to exceptions", "operations")}; return new GatewayReceiverStats(f, ownerName, typeName, descriptors, meterRegistry); @@ -177,11 +168,10 @@ public class GatewayReceiverStats extends CacheServerStats { StatisticDescriptor[] descriptiors, MeterRegistry meterRegistry) { super(f, ownerName, typeName, descriptiors); // Initialize id fields - // failoverBatchesReceivedId = statType.nameToId(FAILOVER_BATCHES_RECEIVED); duplicateBatchesReceivedId = statType.nameToId(DUPLICATE_BATCHES_RECEIVED); outoforderBatchesReceivedId = statType.nameToId(OUT_OF_ORDER_BATCHES_RECEIVED); earlyAcksId = statType.nameToId(EARLY_ACKS); - eventsReceivedId = statType.nameToId(EVENTS_RECEIVED); + final int eventsReceivedId = statType.nameToId(EVENTS_RECEIVED); createRequestId = statType.nameToId(CREAT_REQUESTS); updateRequestId = statType.nameToId(UPDATE_REQUESTS); destroyRequestId = statType.nameToId(DESTROY_REQUESTS); @@ -197,50 +187,37 @@ public class GatewayReceiverStats extends CacheServerStats { .register(meterRegistry); } - // /////////////////// Instance Methods ///////////////////// - - // /** - // * Increments the number of failover batches received by 1. - // */ - // public void incFailoverBatchesReceived() { - // this.stats.incInt(failoverBatchesReceivedId, 1); - // } - // - // public int getFailoverBatchesReceived() { - // return this.stats.getInt(failoverBatchesReceivedId); - // } - /** * Increments the number of duplicate batches received by 1. */ public void incDuplicateBatchesReceived() { - this.stats.incInt(duplicateBatchesReceivedId, 1); + stats.incLong(duplicateBatchesReceivedId, 1); } - public int getDuplicateBatchesReceived() { - return this.stats.getInt(duplicateBatchesReceivedId); + public long getDuplicateBatchesReceived() { + return stats.getLong(duplicateBatchesReceivedId); } /** * Increments the number of out of order batches received by 1. */ public void incOutoforderBatchesReceived() { - this.stats.incInt(outoforderBatchesReceivedId, 1); + stats.incLong(outoforderBatchesReceivedId, 1); } - public int getOutoforderBatchesReceived() { - return this.stats.getInt(outoforderBatchesReceivedId); + public long getOutoforderBatchesReceived() { + return stats.getLong(outoforderBatchesReceivedId); } /** * Increments the number of early acks by 1. */ public void incEarlyAcks() { - this.stats.incInt(earlyAcksId, 1); + stats.incLong(earlyAcksId, 1); } - public int getEarlyAcks() { - return this.stats.getInt(earlyAcksId); + public long getEarlyAcks() { + return stats.getLong(earlyAcksId); } /** @@ -250,74 +227,71 @@ public class GatewayReceiverStats extends CacheServerStats { eventsReceivedCounter.increment(delta); } - public int getEventsReceived() { - return (int) eventsReceivedCounter.count(); + public long getEventsReceived() { + return (long) eventsReceivedCounter.count(); } /** * Increments the number of create requests by 1. */ public void incCreateRequest() { - this.stats.incInt(createRequestId, 1); + stats.incLong(createRequestId, 1); } - public int getCreateRequest() { - return this.stats.getInt(createRequestId); + public long getCreateRequest() { + return stats.getLong(createRequestId); } /** * Increments the number of update requests by 1. */ public void incUpdateRequest() { - this.stats.incInt(updateRequestId, 1); + stats.incLong(updateRequestId, 1); } - public int getUpdateRequest() { - return this.stats.getInt(updateRequestId); + public long getUpdateRequest() { + return stats.getLong(updateRequestId); } /** * Increments the number of destroy request received by 1. */ public void incDestroyRequest() { - this.stats.incInt(destroyRequestId, 1); + stats.incLong(destroyRequestId, 1); } - public int getDestroyRequest() { - return this.stats.getInt(destroyRequestId); + public long getDestroyRequest() { + return stats.getLong(destroyRequestId); } /** * Increments the number of unknown operations received by 1. */ public void incUnknowsOperationsReceived() { - this.stats.incInt(unknowsOperationsReceivedId, 1); + stats.incLong(unknowsOperationsReceivedId, 1); } - public int getUnknowsOperationsReceived() { - return this.stats.getInt(unknowsOperationsReceivedId); + public long getUnknowsOperationsReceived() { + return stats.getLong(unknowsOperationsReceivedId); } - /** - * Increments the number of exceptions occurred by 1. - */ - public void incExceptionsOccurred() { - this.stats.incInt(exceptionsOccurredId, 1); + public void incExceptionsOccurred(int delta) { + stats.incLong(exceptionsOccurredId, delta); } - public int getExceptionsOccurred() { - return this.stats.getInt(exceptionsOccurredId); + public long getExceptionsOccurred() { + return stats.getLong(exceptionsOccurredId); } /** * Increments the number of events received by 1. */ public void incEventsRetried() { - this.stats.incInt(eventsRetriedId, 1); + stats.incLong(eventsRetriedId, 1); } - public int getEventsRetried() { - return this.stats.getInt(eventsRetriedId); + public long getEventsRetried() { + return stats.getLong(eventsRetriedId); } /**