This is an automated email from the ASF dual-hosted git repository. onichols pushed a commit to branch support/1.14 in repository https://gitbox.apache.org/repos/asf/geode.git
commit d1b8319d7bc46de6b10480ebb0c9df41aeae894e Author: Ray Ingles <ring...@pivotal.io> AuthorDate: Tue Feb 23 12:20:45 2021 -0500 GEODE-8894 allow individual deltas to trigger bucket size recalculation (#5978) * add interface for forcing size recalculation on buckets * Allow individual deltas to trigger bucket size recalculation * remove all deprecated methods and redundant tests * update Javadoc and remove Event instance variable * add EntryEventImpl unit tests for size recalc * remove unused classes/code * remove unused TestKey * reorganize and clarify region creation Co-authored-by: Ray Ingles <ring...@vmware.com> (cherry-picked from 3a21c2852746f19755ac302f584ca5b8908eae2e) --- .../internal/cache/DeltaFaultInDUnitTest.java | 6 +- .../cache/DeltaForceSizingFlagDUnitTest.java | 315 +++++++++++++++++++++ .../org/apache/geode/internal/cache/TestDelta.java | 28 +- .../src/main/java/org/apache/geode/Delta.java | 35 ++- .../apache/geode/internal/cache/BucketRegion.java | 1 + .../geode/internal/cache/EntryEventImpl.java | 19 +- .../apache/geode/internal/cache/LocalRegion.java | 3 - .../geode/internal/cache/EntryEventImplTest.java | 56 ++++ 8 files changed, 437 insertions(+), 26 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java index 33af502..3f233d6 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java @@ -32,10 +32,6 @@ import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -/** - * Test that the bucket size does not go negative when we fault out and in a delta object. - * - */ public class DeltaFaultInDUnitTest extends JUnit4CacheTestCase { @@ -45,7 +41,7 @@ public class DeltaFaultInDUnitTest extends JUnit4CacheTestCase { } @Test - public void test() throws Exception { + public void bucketSizeShould_notGoNegative_onFaultInDeltaObject() throws Exception { final Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java new file mode 100644 index 0000000..df95e6e --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.logging.log4j.Logger; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.DiskStoreFactory; +import org.apache.geode.cache.EvictionAction; +import org.apache.geode.cache.EvictionAttributes; +import org.apache.geode.cache.InterestPolicy; +import org.apache.geode.cache.PartitionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.Scope; +import org.apache.geode.cache.SubscriptionAttributes; +import org.apache.geode.cache.util.ObjectSizer; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +/** + * Tests the use of the per-delta "forceRecalculateSize" flag. + */ + +public class DeltaForceSizingFlagDUnitTest { + private static final String TEST_REGION_NAME = "forceResizeTestRegionName"; + public static final String SMALLER_DELTA_DATA = "12345"; + public static final String LARGER_DELTA_DATA = "1234567890"; + public static final String DELTA_KEY = "a_key"; + public static final String RR_DISK_STORE_NAME = "_forceRecalculateSize_replicate_store"; + private static final Logger logger = LogService.getLogger(); + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(); + + protected MemberVM locator; + protected MemberVM server1; + protected MemberVM server2; + + @Before + public void setup() { + int locatorPort; + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + + server1 = cluster.startServerVM(1, locatorPort); + server2 = cluster.startServerVM(2, locatorPort); + } + + @Test + public void testRRMemLRUDelta() { + doRRMemLRUDeltaTest(false); + } + + @Test + public void testRRMemLRUDeltaAndFlag() { + doRRMemLRUDeltaTest(true); + } + + @Test + public void testPRNoLRUDelta() { + doPRNoLRUDeltaTest(false); + } + + @Test + public void testPRNoLRUAndFlagDelta() { + doPRNoLRUDeltaTest(true); + } + + private void doRRMemLRUDeltaTest(boolean shouldSizeChange) { + VM vm1 = server1.getVM(); + VM vm2 = server2.getVM(); + + createRR(server1); + createRR(server2); + TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, shouldSizeChange); + put(vm1, delta1); + + assertValueType(vm1, ValueType.RAW_VALUE); + assertValueType(vm2, ValueType.CD_SERIALIZED); + assertThat(getObjectSizerInvocations(vm1)).isEqualTo(1); + assertThat(getObjectSizerInvocations(vm2)).isEqualTo(0); + + long origEvictionSize0 = getSizeFromEvictionStats(vm1); + long origEvictionSize1 = getSizeFromEvictionStats(vm2); + delta1.info = LARGER_DELTA_DATA; + delta1.hasDelta = true; + // Update the delta + put(vm1, delta1); + + assertValueType(vm1, ValueType.RAW_VALUE); + assertValueType(vm2, ValueType.CD_DESERIALIZED); + + assertThat(getObjectSizerInvocations(vm1)).isEqualTo(2); + + long finalEvictionSize0 = getSizeFromEvictionStats(vm1); + long finalEvictionSize1 = getSizeFromEvictionStats(vm2); + assertThat(finalEvictionSize0 - origEvictionSize0).isEqualTo(5); + if (shouldSizeChange) { + assertThat(getObjectSizerInvocations(vm2)).isEqualTo(1); + // I'm not sure what the change in size should be, because we went + // from serialized to deserialized + assertThat(finalEvictionSize1 - origEvictionSize1).isNotEqualTo(0); + } else { + // we invoke the sizer once when we deserialize the original to apply the delta to it + assertThat(getObjectSizerInvocations(vm2)).isEqualTo(0); + assertThat(finalEvictionSize1 - origEvictionSize1).isEqualTo(0); + } + } + + private void doPRNoLRUDeltaTest(boolean shouldSizeChange) { + VM vm1 = server1.getVM(); + VM vm2 = server2.getVM(); + + createPR(server1); + createPR(server2); + + TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, shouldSizeChange); + put(vm1, delta1); + long origPRSize0 = getSizeFromPRStats(vm1); + long origPRSize1 = getSizeFromPRStats(vm2); + + // Update the delta + delta1.info = LARGER_DELTA_DATA; + delta1.hasDelta = true; + put(vm1, delta1); + long finalPRSize0 = getSizeFromPRStats(vm1); + long finalPRSize1 = getSizeFromPRStats(vm2); + + if (shouldSizeChange) { + // I'm not sure what the change in size should be, because we went + // from serialized to deserialized + assertThat(finalPRSize0 - origPRSize0).isNotEqualTo(0); + assertThat(finalPRSize1 - origPRSize1).isNotEqualTo(0); + } else { + assertThat(finalPRSize0 - origPRSize0).isEqualTo(0); + assertThat(finalPRSize1 - origPRSize1).isEqualTo(0); + } + } + + private long getSizeFromPRStats(VM vm0) { + return vm0.invoke("getSizeFromPRStats", () -> { + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME); + if (region instanceof PartitionedRegion) { + long total = 0; + PartitionedRegion pr = (PartitionedRegion) region; + int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets(); + for (int i = 0; i < totalNumBuckets; i++) { + total += pr.getDataStore().getBucketSize(i); + } + return total; + } else { + return 0L; + } + }); + } + + private long getSizeFromEvictionStats(VM vm0) { + return vm0.invoke("getSizeFromEvictionStats", () -> { + + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME); + return region.getEvictionCounter(); + }); + } + + private int getObjectSizerInvocations(VM vm0) { + return vm0.invoke("getObjectSizerInvocations", () -> { + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME); + return getObjectSizerInvocations(region); + }); + } + + private void put(VM vm0, final Object value) { + vm0.invoke("Put data", () -> { + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME); + region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value); + }); + } + + protected static int getObjectSizerInvocations(InternalRegion region) { + TestObjectSizer sizer = (TestObjectSizer) region.getEvictionAttributes().getObjectSizer(); + int result = sizer.invocations.get(); + logger.info("objectSizerInvocations=" + result); + return result; + } + + private void createRR(MemberVM memberVM) { + memberVM.invoke("Create replicateRegion", () -> { + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + + DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); + diskStoreFactory.setDiskDirs(getMyDiskDirs()); + diskStoreFactory.create(RR_DISK_STORE_NAME); + + RegionFactory<Integer, TestDelta> regionFactory = cache.createRegionFactory(); + regionFactory.setDataPolicy(DataPolicy.REPLICATE); + regionFactory.setDiskStoreName(RR_DISK_STORE_NAME); + regionFactory.setDiskSynchronous(true); + regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1, + new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK)); + regionFactory.setScope(Scope.DISTRIBUTED_ACK); + + regionFactory.create(TEST_REGION_NAME); + }); + } + + private void assertValueType(VM vm, final ValueType expectedType) { + vm.invoke("assertValueType", () -> { + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME); + Object value = region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY); + switch (expectedType) { + case RAW_VALUE: + assertThat(value).isNotInstanceOf(CachedDeserializable.class); + break; + case CD_SERIALIZED: + assertThat(value).isInstanceOf(CachedDeserializable.class); + + Object serializedValue = ((CachedDeserializable) value).getValue(); + assertThat(serializedValue).isInstanceOf(byte[].class); + break; + case CD_DESERIALIZED: + assertThat(value).isInstanceOf(CachedDeserializable.class); + + Object deserializedValue = ((CachedDeserializable) value).getValue(); + assertThat(deserializedValue).isNotInstanceOf(byte[].class); + break; + case EVICTED: + assertThat(value).isNull(); + break; + } + }); + } + + private static File[] getMyDiskDirs() { + long random = new Random().nextLong(); + File file = new File(Long.toString(random)); + assertThat(file.mkdirs()).isTrue(); + return new File[] {file}; + } + + private void createPR(MemberVM memberVM) { + memberVM.invoke("Create partitioned region", () -> { + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + + PartitionAttributesFactory<Integer, TestDelta> paf = + new PartitionAttributesFactory<>(); + paf.setRedundantCopies(1); + PartitionAttributes<Integer, TestDelta> prAttr = paf.create(); + + RegionFactory<Integer, TestDelta> regionFactory = cache.createRegionFactory(); + regionFactory.setDataPolicy(DataPolicy.PARTITION); + regionFactory.setDiskSynchronous(true); + regionFactory.setPartitionAttributes(prAttr); + regionFactory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)); + regionFactory.create(TEST_REGION_NAME); + }); + } + + private static class TestObjectSizer implements ObjectSizer { + private final AtomicInteger invocations = new AtomicInteger(); + + @Override + public int sizeof(Object o) { + logger.info("TestObjectSizer invoked"); + if (o instanceof TestDelta) { + invocations.incrementAndGet(); + return ((TestDelta) o).info.length(); + } + if (o instanceof Integer) { + return 0; + } + throw new RuntimeException("Unexpected type to be sized " + o.getClass() + ", object=" + o); + } + } + + enum ValueType { + RAW_VALUE, CD_SERIALIZED, CD_DESERIALIZED, EVICTED + } +} diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java index cc6ab47..792330e 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java @@ -33,12 +33,28 @@ public class TestDelta implements Delta, DataSerializable, Cloneable { public int deserializations; public int deltas; public int clones; + public boolean forceRecalculateSize; public TestDelta() {} public TestDelta(boolean hasDelta, String info) { this.hasDelta = hasDelta; this.info = info; + this.forceRecalculateSize = false; + } + + public TestDelta(boolean hasDelta, String info, boolean forceRecalculateSize) { + this.hasDelta = hasDelta; + this.info = info; + this.forceRecalculateSize = forceRecalculateSize; + } + + @Override + public String toString() { + return "TestDelta{" + + "info='" + info + "'" + + "forceRecalculateSize='" + forceRecalculateSize + "'" + + '}'; } public synchronized void checkFields(final int serializations, final int deserializations, @@ -51,9 +67,9 @@ public class TestDelta implements Delta, DataSerializable, Cloneable { @Override public synchronized void fromDelta(DataInput in) throws IOException, InvalidDeltaException { - // new Exception("DAN - From Delta Called").printStackTrace(); this.hasDelta = true; info = DataSerializer.readString(in); + forceRecalculateSize = DataSerializer.readBoolean(in); deltas++; } @@ -63,14 +79,18 @@ public class TestDelta implements Delta, DataSerializable, Cloneable { } @Override + public boolean getForceRecalculateSize() { + return forceRecalculateSize; + } + + @Override public synchronized void toDelta(DataOutput out) throws IOException { - // new Exception("DAN - To Delta Called").printStackTrace(); DataSerializer.writeString(info, out); + DataSerializer.writeBoolean(forceRecalculateSize, out); } @Override public synchronized void fromData(DataInput in) throws IOException, ClassNotFoundException { - // new Exception("DAN - From Data Called").printStackTrace(); info = DataSerializer.readString(in); serializations = in.readInt(); deserializations = in.readInt(); @@ -81,7 +101,6 @@ public class TestDelta implements Delta, DataSerializable, Cloneable { @Override public synchronized void toData(DataOutput out) throws IOException { - // new Exception("DAN - To Data Called").printStackTrace(); serializations++; DataSerializer.writeString(info, out); out.writeInt(serializations); @@ -92,7 +111,6 @@ public class TestDelta implements Delta, DataSerializable, Cloneable { @Override public synchronized Object clone() throws CloneNotSupportedException { - // new Exception("DAN - Clone Called").printStackTrace(); clones++; return super.clone(); } diff --git a/geode-core/src/main/java/org/apache/geode/Delta.java b/geode-core/src/main/java/org/apache/geode/Delta.java index 6104a48..8249f74 100755 --- a/geode-core/src/main/java/org/apache/geode/Delta.java +++ b/geode-core/src/main/java/org/apache/geode/Delta.java @@ -23,17 +23,16 @@ import java.io.IOException; * This interface defines a contract between the application and GemFire that allows GemFire to * determine whether an application object contains a delta, allows GemFire to extract the delta * from an application object, and generate a new application object by applying a delta to an - * existing application object. The difference in object state is contained in the - * {@link DataOutput} and {@link DataInput} parameters. + * existing application object. The difference in object state is contained in the {@link + * DataOutput} and {@link DataInput} parameters. * * @since GemFire 6.1 - * */ public interface Delta { /** - * Returns true if this object has pending changes it can write out as a delta. - * Returns false if this object must be transmitted in its entirety. + * Returns true if this object has pending changes it can write out as a delta. Returns false if + * this object must be transmitted in its entirety. */ boolean hasDelta(); @@ -42,8 +41,8 @@ public interface Delta { * presence of a delta by calling {@link Delta#hasDelta()} on the object. The delta is written to * the {@link DataOutput} object provided by GemFire. * + * <p> * Any delta state should be reset in this method. - * */ void toDelta(DataOutput out) throws IOException; @@ -53,7 +52,29 @@ public interface Delta { * This method throws an {@link InvalidDeltaException} when the delta in the {@link DataInput} * cannot be applied to the object. GemFire automatically handles an {@link InvalidDeltaException} * by reattempting the update by sending the full application object. - * */ void fromDelta(DataInput in) throws IOException, InvalidDeltaException; + + /** + * By default, entry sizes are not recalculated when deltas are applied. This optimizes for the + * case where the size of an entry does not change. However, if an entry size does increase or + * decrease, this default behavior can result in the memory usage statistics becoming inaccurate. + * These are used to monitor the health of Geode instances, and for balancing memory usage across + * partitioned regions. + * + * <p> + * There is a system property, gemfire.DELTAS_RECALCULATE_SIZE, which can be used to cause all + * deltas to trigger entry size recalculation when deltas are applied. By default, this is set + * to 'false' because of potential performance impacts when every delta triggers a recalculation. + * + * <p> + * To allow entry size recalculation on a per-delta basis, classes that extend the Delta interface + * should override this method to return 'true'. This may impact performance of specific delta + * types, but will not globally affect the performance of other Geode delta operations. + * + * @since 1.14 + */ + default boolean getForceRecalculateSize() { + return false; + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index d565407..af5ebd0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -1864,6 +1864,7 @@ public class BucketRegion extends DistributedRegion implements Bucket { return; } Object instance = cd.getValue(); + if (instance instanceof org.apache.geode.Delta && ((org.apache.geode.Delta) instance).hasDelta()) { try (HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT)) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java index 5e2b9c9..ac0a5e2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java @@ -26,11 +26,13 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.CopyHelper; import org.apache.geode.DataSerializer; +import org.apache.geode.Delta; import org.apache.geode.DeltaSerializationException; import org.apache.geode.GemFireIOException; import org.apache.geode.InvalidDeltaException; import org.apache.geode.SerializationException; import org.apache.geode.SystemFailure; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.EntryOperation; import org.apache.geode.cache.Operation; @@ -262,7 +264,6 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, protected EntryEventImpl(final InternalRegion region, Operation op, Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal, Object callbackArgument, boolean originRemote, DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) { - this.region = region; InternalDistributedSystem ds = (InternalDistributedSystem) region.getCache().getDistributedSystem(); @@ -1521,7 +1522,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, if (obj instanceof byte[] || obj == null || obj instanceof CachedDeserializable || obj == Token.NOT_AVAILABLE || Token.isInvalidOrRemoved(obj) // don't serialize delta object already serialized - || obj instanceof org.apache.geode.Delta) { // internal delta + || obj instanceof Delta) { // internal delta return obj; } final CachedDeserializable cd; @@ -1713,10 +1714,10 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, // This is a horrible hack, but we need to get the size of the object // When we store an entry. This code is only used when we do a put // in the primary. - if (v instanceof org.apache.geode.Delta && getRegion().isUsedForPartitionedRegionBucket()) { + if (v instanceof Delta && getRegion().isUsedForPartitionedRegionBucket()) { int vSize; Object ov = basicGetOldValue(); - if (ov instanceof CachedDeserializable && !GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) { + if (ov instanceof CachedDeserializable && !(shouldRecalculateSize((Delta) v))) { vSize = ((CachedDeserializable) ov).getValueSizeInBytes(); } else { vSize = CachedDeserializableFactory.calcMemSize(v, getRegion().getObjectSizer(), false); @@ -1835,7 +1836,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, boolean deltaBytesApplied = false; try (ByteArrayDataInput in = new ByteArrayDataInput(getDeltaBytes())) { long start = getRegion().getCachePerfStats().getTime(); - ((org.apache.geode.Delta) value).fromDelta(in); + ((Delta) value).fromDelta(in); getRegion().getCachePerfStats().endDeltaUpdate(start); deltaBytesApplied = true; } catch (RuntimeException rte) { @@ -1858,7 +1859,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, if (wasCD) { CachedDeserializable old = (CachedDeserializable) oldValueInVM; int valueSize; - if (GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) { + if (shouldRecalculateSize((Delta) value)) { valueSize = CachedDeserializableFactory.calcMemSize(value, getRegion().getObjectSizer(), false); } else { @@ -1878,6 +1879,12 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent, } } + @VisibleForTesting + protected static boolean shouldRecalculateSize(Delta value) { + return GemFireCacheImpl.DELTAS_RECALCULATE_SIZE + || value.getForceRecalculateSize(); + } + void setTXEntryOldValue(Object oldVal, boolean mustBeAvailable) { if (Token.isInvalidOrRemoved(oldVal)) { oldVal = null; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 1833039..dbe3ced 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -151,7 +151,6 @@ import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionAdvisor; import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.DistributionStats; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ResourceEvent; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; @@ -1728,7 +1727,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } if (extractDelta && ((Delta) value).hasDelta()) { try (HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT)) { - long start = DistributionStats.getStatTime(); try { ((Delta) value).toDelta(hdos); } catch (RuntimeException re) { @@ -1737,7 +1735,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, throw new DeltaSerializationException("Caught exception while sending delta", e); } event.setDeltaBytes(hdos.toByteArray()); - getCachePerfStats().endDeltaPrepared(start); } } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java index 44227ea..6768ca2 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java @@ -29,9 +29,11 @@ import static org.mockito.Mockito.when; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.apache.geode.Delta; import org.apache.geode.cache.Operation; import org.apache.geode.cache.SerializedCacheValue; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -913,6 +915,60 @@ public class EntryEventImplTest { assertThat(event.isTransactional()).isFalse(); } + @Test + public void shouldRecalculateSize_returnsTrue_ifGetForceRecalculateSizeIsTrue_andDELTAS_RECALCULATE_SIZEisTrue() { + GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = true; + Delta deltaValue = mock(Delta.class); + when(deltaValue.getForceRecalculateSize()) + .thenReturn(true); + + boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue); + + assertThat(value).isTrue(); + } + + @Test + public void shouldRecalculateSize_returnsTrue_ifDELTAS_RECALCULATE_SIZEisTrue_andGetForceRecalculateSizeIsFalse() { + GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = true; + Delta deltaValue = mock(Delta.class); + when(deltaValue.getForceRecalculateSize()) + .thenReturn(false); + + boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue); + + assertThat(value).isTrue(); + } + + @Test + public void shouldRecalculateSize_returnsTrue_ifGetForceRecalculateSizeIsTrue_andDELTAS_RECALCULATE_SIZEIsFalse() { + GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false; + Delta deltaValue = mock(Delta.class); + when(deltaValue.getForceRecalculateSize()) + .thenReturn(true); + + boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue); + + assertThat(value).isTrue(); + } + + + @Test + public void shouldRecalculateSize_returnsFalse_ifBothDELTAS_RECALCULATE_SIZEIsFalse_andGetForceRecalculateSizeIsFalse() { + GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false; + Delta deltaValue = mock(Delta.class); + when(deltaValue.getForceRecalculateSize()) + .thenReturn(false); + + boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue); + + assertThat(value).isFalse(); + } + + @After + public void tearDown() { + GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false; + } + private static class EntryEventImplWithOldValuesDisabled extends EntryEventImpl { @Override protected boolean areOldValuesEnabled() {