This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch support/1.14 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push: new 3a7b7ba GEODE-9625: Only serialize transaction metadata when grouping enabled. (#6984) 3a7b7ba is described below commit 3a7b7bae47b454b8c9f47f80be3a0a90a4877343 Author: Jacob Barrett <jabarr...@vmware.com> AuthorDate: Wed Oct 13 10:47:48 2021 -0700 GEODE-9625: Only serialize transaction metadata when grouping enabled. (#6984) (cherry picked from commit ab651eba9558752fe1336919e462350f4581a22a) --- .../apache/geode/codeAnalysis/excludedClasses.txt | 3 +- .../wan/AbstractGatewaySenderEventProcessor.java | 18 ++++- .../internal/cache/wan/GatewaySenderEventImpl.java | 68 ++++++++++------- .../ParallelGatewaySenderEventProcessor.java | 2 +- .../serial/SerialGatewaySenderEventProcessor.java | 8 +- .../AbstractGatewaySenderEventProcessorTest.java | 32 +++++++- .../cache/wan/GatewaySenderEventImplTest.java | 86 ++++++++++++++++++++++ .../wan/parallel/ParallelGatewaySenderHelper.java | 3 +- ...SerialGatewaySenderEventProcessorJUnitTest.java | 2 +- 9 files changed, 187 insertions(+), 35 deletions(-) diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt index 99ab950..675e272 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt @@ -105,4 +105,5 @@ org/apache/geode/cache/query/internal/xml/ElementType org/apache/geode/cache/query/internal/xml/ElementType$1 org/apache/geode/cache/query/internal/xml/ElementType$2 org/apache/geode/cache/query/internal/xml/ElementType$3 -org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut \ No newline at end of file +org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut +org/apache/geode/internal/cache/wan/GatewaySenderEventImpl$TransactionMetadataDisposition diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java index 8545ab9..4c82b66 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java @@ -14,6 +14,10 @@ */ package org.apache.geode.internal.cache.wan; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -886,7 +890,7 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread event.setCallbackArgument(geCallbackArg); // OFFHEAP: event for pdx type meta data so it should never be off-heap GatewaySenderEventImpl pdxSenderEvent = - new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, event, null, false); + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, event, null); pdxEventsMap.put(typeEntry.getKey(), pdxSenderEvent); pdxSenderEventsList.add(pdxSenderEvent); @@ -1309,6 +1313,18 @@ public abstract class AbstractGatewaySenderEventProcessor extends LoggingThread } + protected GatewaySenderEventImpl.TransactionMetadataDisposition getTransactionMetadataDisposition( + final boolean isLastEventInTransaction) { + if (getSender().mustGroupTransactionEvents()) { + if (isLastEventInTransaction) { + return INCLUDE_LAST_EVENT; + } + return INCLUDE; + } else { + return EXCLUDE; + } + } + /** * Logs a batch of events. * diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index e4a8b75..47ea4c3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -15,6 +15,9 @@ package org.apache.geode.internal.cache.wan; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -181,7 +184,8 @@ public class GatewaySenderEventImpl private short version; - private boolean isLastEventInTransaction = true; + private boolean isLastEventInTransaction = false; + private TransactionId transactionId = null; @@ -221,58 +225,54 @@ public class GatewaySenderEventImpl private volatile int serializedValueSize = DEFAULT_SERIALIZED_VALUE_SIZE; - // /** - // * Is this thread in the process of deserializing this event? - // */ - // public static final ThreadLocal isDeserializingValue = new ThreadLocal() { - // @Override - // protected Object initialValue() { - // return Boolean.FALSE; - // } - // }; + /** - * Constructor. No-arg constructor for data serialization. + * No-arg constructor for data serialization. * * @see DataSerializer */ public GatewaySenderEventImpl() {} /** - * Constructor. Creates an initialized <code>GatewayEventImpl</code> - * * @param operation The operation for this event (e.g. AFTER_CREATE) * @param event The <code>CacheEvent</code> on which this <code>GatewayEventImpl</code> is based * @param substituteValue The value to be enqueued instead of the value in the event. - * @param isLastEventInTransaction true if the event is the last in the transaction + * @param transactionMetadataDisposition indicating the inclusion of transaction metadata. * */ @Retained public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent event, - Object substituteValue, boolean isLastEventInTransaction) throws IOException { - this(operation, event, substituteValue, true, isLastEventInTransaction); + Object substituteValue, final TransactionMetadataDisposition transactionMetadataDisposition) + throws IOException { + this(operation, event, substituteValue, true, transactionMetadataDisposition); + } + + @Retained + public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> event, + Object substituteValue) throws IOException { + this(operation, event, substituteValue, true, EXCLUDE); } @Retained public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent event, Object substituteValue, boolean initialize, int bucketId, - boolean isLastEventInTransaction) throws IOException { - this(operation, event, substituteValue, initialize, isLastEventInTransaction); + final TransactionMetadataDisposition transactionMetadataDisposition) throws IOException { + this(operation, event, substituteValue, initialize, transactionMetadataDisposition); this.bucketId = bucketId; } /** - * Constructor. - * * @param operation The operation for this event (e.g. AFTER_CREATE) * @param ce The <code>CacheEvent</code> on which this <code>GatewayEventImpl</code> is based * @param substituteValue The value to be enqueued instead of the value in the event. * @param initialize Whether to initialize this instance - * + * @param transactionMetadataDisposition indicating the inclusion of transaction metadata. */ @Retained public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent ce, Object substituteValue, - boolean initialize, boolean isLastEventInTransaction) throws IOException { + boolean initialize, + final TransactionMetadataDisposition transactionMetadataDisposition) throws IOException { // Set the operation and event final EntryEventImpl event = (EntryEventImpl) ce; this.operation = operation; @@ -328,9 +328,11 @@ public class GatewaySenderEventImpl } this.isConcurrencyConflict = event.isConcurrencyConflict(); - this.transactionId = event.getTransactionId(); - this.isLastEventInTransaction = isLastEventInTransaction; - + if (transactionMetadataDisposition != EXCLUDE) { + transactionId = event.getTransactionId(); + isLastEventInTransaction = + transactionMetadataDisposition == INCLUDE_LAST_EVENT && null != transactionId; + } } /** @@ -1370,4 +1372,20 @@ public class GatewaySenderEventImpl this.value = getSerializedValue(); } } + + public enum TransactionMetadataDisposition { + /** + * Transaction metadata should be excluded from the event. + */ + EXCLUDE, + /** + * Transaction metadata should be included in the event. + */ + INCLUDE, + /** + * Transaction metadata should be included in the event and this is the last event in the + * transaction. + */ + INCLUDE_LAST_EVENT, + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java index 65567f0..5c08283 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java @@ -123,7 +123,7 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv // change 42466). bucketID is merged with eventID.getBucketID gatewayQueueEvent = new GatewaySenderEventImpl(operation, event, substituteValue, true, eventID.getBucketID(), - isLastEventInTransaction); + getTransactionMetadataDisposition(isLastEventInTransaction)); enqueueEvent(gatewayQueueEvent); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java index 69142ce..cec2b86 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java @@ -428,7 +428,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven if (!(isUpdateVersionStamp || isCME_And_NotOriginRemote)) { senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false, - isLastEventInTransaction); + getTransactionMetadataDisposition(isLastEventInTransaction)); handleSecondaryEvent(senderEvent); } } @@ -442,9 +442,9 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven waitForFailoverCompletion(); } // If it is, create and enqueue an initialized GatewayEventImpl - senderEvent = - new GatewaySenderEventImpl(operation, event, substituteValue, isLastEventInTransaction); // OFFHEAP - // ok + // OFFHEAP ok + senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, + getTransactionMetadataDisposition(isLastEventInTransaction)); boolean queuedEvent = false; try { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorTest.java index e9c83d6..ad99248 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessorTest.java @@ -14,7 +14,11 @@ */ package org.apache.geode.internal.cache.wan; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -27,7 +31,7 @@ import org.apache.geode.internal.cache.RegionQueue; public class AbstractGatewaySenderEventProcessorTest { - private RegionQueue queue = mock(RegionQueue.class); + private final RegionQueue queue = mock(RegionQueue.class); @Test public void eventQueueSizeReturnsQueueSize() { @@ -49,4 +53,30 @@ public class AbstractGatewaySenderEventProcessorTest { verify(queue, never()).size(); } + + @Test + public void getTransactionMetadataDispositionIncludedWhenSenderMustGroupTransactionEventsTrue() { + final AbstractGatewaySenderEventProcessor processor = + mock(AbstractGatewaySenderEventProcessor.class); + final AbstractGatewaySender sender = mock(AbstractGatewaySender.class); + when(processor.getSender()).thenReturn(sender); + when(sender.mustGroupTransactionEvents()).thenReturn(true); + when(processor.getTransactionMetadataDisposition(anyBoolean())).thenCallRealMethod(); + + assertThat(processor.getTransactionMetadataDisposition(false)).isEqualTo(INCLUDE); + assertThat(processor.getTransactionMetadataDisposition(true)).isEqualTo(INCLUDE_LAST_EVENT); + } + + @Test + public void getTransactionMetadataDispositionExcludedWhenSenderMustGroupTransactionEventsFalse() { + final AbstractGatewaySenderEventProcessor processor = + mock(AbstractGatewaySenderEventProcessor.class); + final AbstractGatewaySender sender = mock(AbstractGatewaySender.class); + when(sender.mustGroupTransactionEvents()).thenReturn(false); + when(processor.getSender()).thenReturn(sender); + when(processor.getTransactionMetadataDisposition(anyBoolean())).thenCallRealMethod(); + + assertThat(processor.getTransactionMetadataDisposition(false)).isEqualTo(EXCLUDE); + assertThat(processor.getTransactionMetadataDisposition(true)).isEqualTo(EXCLUDE); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java index 3add1a1..fcd7376 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java @@ -14,6 +14,9 @@ */ package org.apache.geode.internal.cache.wan; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.INCLUDE_LAST_EVENT; import static org.apache.geode.internal.serialization.KnownVersion.GEODE_1_13_0; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; @@ -38,6 +41,8 @@ import org.apache.geode.cache.Operation; import org.apache.geode.cache.TransactionId; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.cache.EntryEventImpl; +import org.apache.geode.internal.cache.EnumListenerEvent; import org.apache.geode.internal.cache.EventID; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.LocalRegion; @@ -185,4 +190,85 @@ public class GatewaySenderEventImplTest { assertThat(event).isNotEqualTo(eventDifferentRegion); } + @Test + public void constructsWithTransactionMetadataWhenInclude() throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class)); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, INCLUDE); + + assertThat(gatewaySenderEvent.getTransactionId()).isNotNull(); + assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse(); + } + + @Test + public void constructsWithTransactionMetadataWhenIncludedLastEvent() throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class)); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, + INCLUDE_LAST_EVENT); + + assertThat(gatewaySenderEvent.getTransactionId()).isNotNull(); + assertThat(gatewaySenderEvent.isLastEventInTransaction()).isTrue(); + } + + @Test + public void constructsWithoutTransactionMetadataWhenExcluded() throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class)); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, EXCLUDE); + + assertThat(gatewaySenderEvent.getTransactionId()).isNull(); + assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse(); + } + + @Test + public void constructsWithoutTransactionMetadataWhenIncludedButNotTransactionEvent() + throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(null); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, INCLUDE); + + assertThat(gatewaySenderEvent.getTransactionId()).isNull(); + assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse(); + } + + @Test + public void constructsWithoutTransactionMetadataWhenIncludedLastEventButNotTransactionEvent() + throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(null); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, + INCLUDE_LAST_EVENT); + + assertThat(gatewaySenderEvent.getTransactionId()).isNull(); + assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse(); + } + + @Test + public void constructsWithoutTransactionMetadataWhenExcludedButNotTransactionEvent() + throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(null); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, EXCLUDE); + + assertThat(gatewaySenderEvent.getTransactionId()).isNull(); + assertThat(gatewaySenderEvent.isLastEventInTransaction()).isFalse(); + } + + private EntryEventImpl mockEntryEventImpl(final TransactionId transactionId) { + final EntryEventImpl cacheEvent = mock(EntryEventImpl.class); + when(cacheEvent.getEventId()).thenReturn(mock(EventID.class)); + when(cacheEvent.getOperation()).thenReturn(Operation.CREATE); + when(cacheEvent.getTransactionId()).thenReturn(transactionId); + final LocalRegion region = mock(LocalRegion.class); + when(cacheEvent.getRegion()).thenReturn(region); + return cacheEvent; + } + } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java index 37629fc..d06a29f 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java @@ -15,6 +15,7 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.internal.cache.wan.GatewaySenderEventImpl.TransactionMetadataDisposition.EXCLUDE; import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -94,7 +95,7 @@ public class ParallelGatewaySenderHelper { eei.setEventId(new EventID(new byte[16], threadId, sequenceId, bucketId)); GatewaySenderEventImpl gsei = new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null, true, bucketId, - false); + EXCLUDE); gsei.setShadowKey(shadowKey); return gsei; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java index 142342a..5d46530 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessorJUnitTest.java @@ -359,7 +359,7 @@ public class SerialGatewaySenderEventProcessorJUnitTest { EntryEventImpl eei = EntryEventImpl.create(lr, operation, key, value, null, false, null); eei.setEventId(new EventID(new byte[16], threadId, sequenceId)); GatewaySenderEventImpl gsei = - new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null, true, false); + new GatewaySenderEventImpl(getEnumListenerEvent(operation), eei, null); return gsei; }