This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new fb142e1 GEODE-9825: processInputBuffer resize retains data (#7131) fb142e1 is described below commit fb142e1bbd42d6af2463fd9b9b49ef3e5519cfcb Author: Bill Burcham <bill.burc...@gmail.com> AuthorDate: Tue Nov 23 08:29:11 2021 -0800 GEODE-9825: processInputBuffer resize retains data (#7131) --- .../internal/P2PMessagingConcurrencyDUnitTest.java | 136 +++++++++++++++------ .../org/apache/geode/internal/tcp/Connection.java | 12 +- 2 files changed, 111 insertions(+), 37 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java index e54191a..c204777 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/P2PMessagingConcurrencyDUnitTest.java @@ -15,6 +15,7 @@ package org.apache.geode.distributed.internal; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; import java.io.DataInput; @@ -29,12 +30,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import junitparams.Parameters; import org.jetbrains.annotations.NotNull; -import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.ssl.CertStores; @@ -48,15 +50,24 @@ import org.apache.geode.test.dunit.rules.ClusterStartupRule; import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule; import org.apache.geode.test.dunit.rules.MemberVM; import org.apache.geode.test.junit.categories.MembershipTest; +import org.apache.geode.test.junit.runners.GeodeParamsRunner; import org.apache.geode.test.version.VersionManager; /** - * Tests one-way P2P messaging between two peers. A shared, - * ordered connection is used and many concurrent tasks - * compete on the sending side. Tests with TLS enabled - * to exercise ByteBufferSharing and friends. + * Tests one-way P2P messaging between two peers. + * Many concurrent tasks compete on the sending side. + * The main purpose of the test is to exercise + * ByteBufferSharing and friends. + * + * Tests combinations of: conserve-sockets true/false, + * TLS on/off, and socket-buffer-size for sender + * and receiver both set to the default (and equal) + * and set to the sender's buffer twice as big as the + * receiver's buffer. + * */ @Category({MembershipTest.class}) +@RunWith(GeodeParamsRunner.class) public class P2PMessagingConcurrencyDUnitTest { // how many messages will each sender generate? @@ -71,6 +82,8 @@ public class P2PMessagingConcurrencyDUnitTest { // random seed private static final int RANDOM_SEED = 1234; + private static Properties securityProperties; + @Rule public final ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); @@ -87,21 +100,56 @@ public class P2PMessagingConcurrencyDUnitTest { */ private static LongAdder bytesTransferredAdder; - @Before - public void before() throws GeneralSecurityException, IOException { - final Properties configuration = gemFireConfiguration(); + private void configure( + final boolean conserveSockets, + final boolean useTLS, + final int sendSocketBufferSize, + final int receiveSocketBufferSize) throws GeneralSecurityException, IOException { + + final Properties senderConfiguration = + gemFireConfiguration(conserveSockets, useTLS, sendSocketBufferSize); + final Properties receiverConfiguration = + gemFireConfiguration(conserveSockets, useTLS, receiveSocketBufferSize); final MemberVM locator = clusterStartupRule.startLocatorVM(0, 0, VersionManager.CURRENT_VERSION, - x -> x.withProperties(configuration).withConnectionToLocator() + x -> x.withProperties(senderConfiguration).withConnectionToLocator() .withoutClusterConfigurationService().withoutManagementRestService()); - sender = clusterStartupRule.startServerVM(1, configuration, locator.getPort()); - receiver = clusterStartupRule.startServerVM(2, configuration, locator.getPort()); + sender = clusterStartupRule.startServerVM(1, senderConfiguration, locator.getPort()); + receiver = clusterStartupRule.startServerVM(2, receiverConfiguration, locator.getPort()); } @Test - public void testP2PMessagingWithTLS() { + @Parameters({ + /* + * all combinations of flags with buffer sizes: + * (equal), larger/smaller, smaller/larger, minimal + */ + "true, true, 32768, 32768", + "true, true, 65536, 32768", + "true, true, 32768, 65536", + "true, true, 1024, 1024", + "true, false, 32768, 32768", + "true, false, 65536, 32768", + "true, false, 32768, 65536", + "true, false, 1024, 1024", + "false, true, 32768, 32768", + "false, true, 65536, 32768", + "false, true, 32768, 65536", + "false, true, 1024, 1024", + "false, false, 32768, 32768", + "false, false, 65536, 32768", + "false, false, 32768, 65536", + "false, false, 1024, 1024", + }) + public void testP2PMessaging( + final boolean conserveSockets, + final boolean useTLS, + final int sendSocketBufferSize, + final int receiveSocketBufferSize) throws GeneralSecurityException, IOException { + + configure(conserveSockets, useTLS, sendSocketBufferSize, receiveSocketBufferSize); final InternalDistributedMember receiverMember = receiver.invoke(() -> { @@ -172,10 +220,16 @@ public class P2PMessagingConcurrencyDUnitTest { }); - final long bytesSent = sender.invoke(() -> bytesTransferredAdder.sum()); - final long bytesReceived = receiver.invoke(() -> bytesTransferredAdder.sum()); + final long bytesSent = getByteCount(sender); - assertThat(bytesReceived).as("bytes received != bytes sent").isEqualTo(bytesSent); + await().untilAsserted( + () -> assertThat(getByteCount(receiver)) + .as("bytes received != bytes sent") + .isEqualTo(bytesSent)); + } + + private long getByteCount(final MemberVM member) { + return member.invoke(() -> bytesTransferredAdder.sum()); } private static ClusterDistributionManager getCDM() { @@ -245,7 +299,7 @@ public class P2PMessagingConcurrencyDUnitTest { throws IOException, ClassNotFoundException { super.fromData(in, context); - final int messageId = in.readInt(); + messageId = in.readInt(); final int length = in.readInt(); @@ -263,10 +317,19 @@ public class P2PMessagingConcurrencyDUnitTest { } @NotNull - private static Properties gemFireConfiguration() + private static Properties gemFireConfiguration( + final boolean conserveSockets, final boolean useTLS, + final int socketBufferSize) throws GeneralSecurityException, IOException { - final Properties props = securityProperties(); + final Properties props; + if (useTLS) { + props = securityProperties(); + } else { + props = new Properties(); + } + + props.setProperty("socket-buffer-size", String.valueOf(socketBufferSize)); /* * This is something we intend to test! @@ -276,29 +339,32 @@ public class P2PMessagingConcurrencyDUnitTest { * * careful: if you set a boolean it doesn't take hold! setting a String */ - props.setProperty("conserve-sockets", "true"); + props.setProperty("conserve-sockets", String.valueOf(conserveSockets)); return props; } @NotNull private static Properties securityProperties() throws GeneralSecurityException, IOException { - final CertificateMaterial ca = new CertificateBuilder() - .commonName("Test CA") - .isCA() - .generate(); - - final CertificateMaterial serverCertificate = new CertificateBuilder() - .commonName("member") - .issuedBy(ca) - .generate(); - - final CertStores memberStore = new CertStores("member"); - memberStore.withCertificate("member", serverCertificate); - memberStore.trust("ca", ca); - // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc - final Properties props = memberStore.propertiesWith("all", false, false); - return props; + // subsequent calls must return the same value so members agree on credentials + if (securityProperties == null) { + final CertificateMaterial ca = new CertificateBuilder() + .commonName("Test CA") + .isCA() + .generate(); + + final CertificateMaterial serverCertificate = new CertificateBuilder() + .commonName("member") + .issuedBy(ca) + .generate(); + + final CertStores memberStore = new CertStores("member"); + memberStore.withCertificate("member", serverCertificate); + memberStore.trust("ca", ca); + // we want to exercise the ByteBufferSharing code paths; we don't care about client auth etc + securityProperties = memberStore.propertiesWith("all", false, false); + } + return securityProperties; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 0cd352f..0a3a14e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -2770,6 +2770,9 @@ public class Connection implements Runnable { /** * processes the current NIO buffer. If there are complete messages in the buffer, they are * deserialized and passed to TCPConduit for further processing + * + * pre-condition: inputBuffer (from inputSharing.getBuffer()) is in WRITABLE mode + * post-condition: inputBuffer is in WRITABLE mode */ private void processInputBuffer(AbstractExecutor threadMonitorExecutor) throws ConnectionException, IOException { @@ -2846,12 +2849,12 @@ public class Connection implements Runnable { "Allocating larger network read buffer, new size is {} old size was {}.", allocSize, oldBufferSize); inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize); + makeReadableBufferWriteable(inputBuffer); } else { if (inputBuffer.position() != 0) { inputBuffer.compact(); } else { - inputBuffer.position(inputBuffer.limit()); - inputBuffer.limit(inputBuffer.capacity()); + makeReadableBufferWriteable(inputBuffer); } } } @@ -2865,6 +2868,11 @@ public class Connection implements Runnable { } } + private void makeReadableBufferWriteable(final ByteBuffer inputBuffer) { + inputBuffer.position(inputBuffer.limit()); + inputBuffer.limit(inputBuffer.capacity()); + } + private boolean readHandshakeForReceiver(final DataInput dis) { try { checkHandshakeInitialByte(dis);