This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch GEODE-7513-PersistentColocatedPartitionedRegionDistributedTest-debugging in repository https://gitbox.apache.org/repos/asf/geode.git
commit a3e084d8ae9895accfe3c55adf33a006cb01daf2 Author: Jens Deppe <jde...@pivotal.io> AuthorDate: Tue Feb 25 13:01:54 2020 -0800 GEODE-7798: Fix flakiness in PubSubTest (#4717) - Remove flusher execution in favor of just calling writeAndFlush - Bump Netty to 4.1.45 - Correct message depending on type of subscription processing a PUBLISH --- .../src/test/resources/expected-pom.xml | 2 +- .../gradle/plugins/DependencyConstraints.groovy | 2 +- .../integrationTest/resources/assembly_content.txt | 2 +- .../resources/dependency_classpath.txt | 2 +- geode-redis/build.gradle | 19 +++++++----- .../java/org/apache/geode/redis/PubSubTest.java | 34 ++++++++++------------ .../apache/geode/redis/mocks/MockSubscriber.java | 7 ++++- .../geode/redis/internal/AbstractSubscription.java | 4 +-- .../geode/redis/internal/ChannelSubscription.java | 8 +++++ .../redis/internal/ExecutionHandlerContext.java | 20 +------------ .../geode/redis/internal/PatternSubscription.java | 8 +++++ .../apache/geode/redis/internal/Subscription.java | 8 +++++ .../internal/org/apache/hadoop/fs/GlobPattern.java | 17 ++++++++--- 13 files changed, 78 insertions(+), 55 deletions(-) diff --git a/boms/geode-all-bom/src/test/resources/expected-pom.xml b/boms/geode-all-bom/src/test/resources/expected-pom.xml index 434044f..7df23cd 100644 --- a/boms/geode-all-bom/src/test/resources/expected-pom.xml +++ b/boms/geode-all-bom/src/test/resources/expected-pom.xml @@ -214,7 +214,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> - <version>4.1.42.Final</version> + <version>4.1.45.Final</version> <scope>compile</scope> </dependency> <dependency> diff --git a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy index e36a2f2..2c72c75 100644 --- a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy +++ b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy @@ -113,7 +113,7 @@ class DependencyConstraints implements Plugin<Project> { api(group: 'commons-validator', name: 'commons-validator', version: get('commons-validator.version')) api(group: 'io.github.classgraph', name: 'classgraph', version: '4.8.52') api(group: 'io.micrometer', name: 'micrometer-core', version: get('micrometer.version')) - api(group: 'io.netty', name: 'netty-all', version: '4.1.42.Final') + api(group: 'io.netty', name: 'netty-all', version: '4.1.45.Final') api(group: 'it.unimi.dsi', name: 'fastutil', version: get('fastutil.version')) api(group: 'javax.annotation', name: 'javax.annotation-api', version: '1.3.2') api(group: 'javax.annotation', name: 'jsr250-api', version: '1.0') diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt index 8b3b268..a34c1c3 100644 --- a/geode-assembly/src/integrationTest/resources/assembly_content.txt +++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt @@ -1040,7 +1040,7 @@ lib/micrometer-core-1.2.1.jar lib/mx4j-3.0.2.jar lib/mx4j-remote-3.0.2.jar lib/mx4j-tools-3.0.1.jar -lib/netty-all-4.1.42.Final.jar +lib/netty-all-4.1.45.Final.jar lib/protobuf-java-3.10.0.jar lib/ra.jar lib/rmiio-2.1.2.jar diff --git a/geode-assembly/src/integrationTest/resources/dependency_classpath.txt b/geode-assembly/src/integrationTest/resources/dependency_classpath.txt index 95af982..b4785cc 100644 --- a/geode-assembly/src/integrationTest/resources/dependency_classpath.txt +++ b/geode-assembly/src/integrationTest/resources/dependency_classpath.txt @@ -87,6 +87,6 @@ lucene-core-6.6.6.jar lucene-queries-6.6.6.jar protobuf-java-3.10.0.jar geo-0.7.1.jar -netty-all-4.1.42.Final.jar +netty-all-4.1.45.Final.jar grumpy-core-0.2.2.jar commons-math3-3.2.jar diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle index a51a3b4..b107fe9 100644 --- a/geode-redis/build.gradle +++ b/geode-redis/build.gradle @@ -21,17 +21,22 @@ apply from: "${project.projectDir}/../gradle/publish-java.gradle" dependencies { compile(platform(project(':boms:geode-all-bom'))) - implementation(project(':geode-serialization')) - implementation(project(':geode-logging')) compile(project(':geode-core')) + compile(project(':geode-gfsh')) compile('com.github.davidmoten:geo') compile('io.netty:netty-all') compile('org.apache.logging.log4j:log4j-api') - compile(project(':geode-gfsh')) - distributedTestCompile(project(':geode-dunit')) - integrationTestCompile('redis.clients:jedis') - distributedTestCompile('redis.clients:jedis') - testCompile('org.mockito:mockito-core') + implementation(project(':geode-logging')) + implementation(project(':geode-serialization')) + testCompile(project(':geode-junit')) + testCompile('org.mockito:mockito-core') + integrationTestCompile(project(':geode-junit')) + integrationTestCompile('redis.clients:jedis') + + integrationTestRuntime(project(':geode-log4j')) + + distributedTestCompile(project(':geode-dunit')) + distributedTestCompile('redis.clients:jedis') } diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java index 68d43f0..924b2be 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java @@ -18,19 +18,15 @@ package org.apache.geode.redis; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; -import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.assertj.core.api.Assertions.assertThat; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Random; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import redis.clients.jedis.Jedis; @@ -39,32 +35,32 @@ import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.GemFireCache; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.redis.mocks.MockSubscriber; +import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.junit.categories.RedisTest; @Category({RedisTest.class}) -@Ignore("GEODE-7798") public class PubSubTest { + private static final int REDIS_CLIENT_TIMEOUT = 100000; private static GeodeRedisServer server; private static GemFireCache cache; - private static Random rand; private static Jedis publisher; private static Jedis subscriber; private static int port = 6379; @BeforeClass public static void setUp() { - rand = new Random(); CacheFactory cf = new CacheFactory(); - cf.set(LOG_LEVEL, "error"); + cf.set(LOG_LEVEL, "warn"); cf.set(MCAST_PORT, "0"); cf.set(LOCATORS, ""); cache = cf.create(); + port = AvailablePortHelper.getRandomAvailableTCPPort(); server = new GeodeRedisServer("localhost", port); - subscriber = new Jedis("localhost", port, 100000); - publisher = new Jedis("localhost", port, 100000); - server.start(); + + subscriber = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT); + publisher = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT); } @AfterClass @@ -127,7 +123,7 @@ public class PubSubTest { @Test public void testTwoSubscribersOneChannel() { - Jedis subscriber2 = new Jedis("localhost", port); + Jedis subscriber2 = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT); MockSubscriber mockSubscriber1 = new MockSubscriber(); MockSubscriber mockSubscriber2 = new MockSubscriber(); @@ -189,7 +185,7 @@ public class PubSubTest { @Test public void testDeadSubscriber() { - Jedis deadSubscriber = new Jedis("localhost", port); + Jedis deadSubscriber = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT); MockSubscriber mockSubscriber = new MockSubscriber(); @@ -226,8 +222,8 @@ public class PubSubTest { Long result = publisher.publish("salutations", "hello"); assertThat(result).isEqualTo(1); - assertThat(mockSubscriber.getReceivedMessages()).hasSize(1); - assertThat(mockSubscriber.getReceivedMessages()).contains("hello"); + assertThat(mockSubscriber.getReceivedMessages()).isEmpty(); + assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello"); mockSubscriber.punsubscribe("sal*s"); waitFor(() -> mockSubscriber.getSubscribedChannels() == 0); @@ -260,7 +256,8 @@ public class PubSubTest { waitFor(() -> !subscriberThread.isAlive()); - assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello", "hello"); + assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello"); + assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello"); } @Test @@ -289,11 +286,12 @@ public class PubSubTest { waitFor(() -> !subscriberThread.isAlive()); - assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello", "hello"); + assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello"); + assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello"); } private void waitFor(Callable<Boolean> booleanCallable) { - await().atMost(1, TimeUnit.SECONDS) + GeodeAwaitility.await() .ignoreExceptions() // ignoring socket closed exceptions .until(booleanCallable); } diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java index a1e9d41..b0d5326 100644 --- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java +++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java @@ -23,11 +23,16 @@ import redis.clients.jedis.JedisPubSub; public class MockSubscriber extends JedisPubSub { private List<String> receivedMessages = new ArrayList<>(); + private List<String> receivedPMessages = new ArrayList<>(); public List<String> getReceivedMessages() { return receivedMessages; } + public List<String> getReceivedPMessages() { + return receivedPMessages; + } + @Override public void onMessage(String channel, String message) { receivedMessages.add(message); @@ -35,6 +40,6 @@ public class MockSubscriber extends JedisPubSub { @Override public void onPMessage(String pattern, String channel, String message) { - receivedMessages.add(message); + receivedPMessages.add(message); } } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java index c8514d5..2164eb9 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java @@ -16,7 +16,6 @@ package org.apache.geode.redis.internal; -import java.util.Arrays; import java.util.concurrent.ExecutionException; import io.netty.buffer.ByteBuf; @@ -64,7 +63,7 @@ public abstract class AbstractSubscription implements Subscription { ByteBuf messageByteBuffer; try { messageByteBuffer = Coder.getArrayResponse(context.getByteBufAllocator(), - Arrays.asList("message", channel, message)); + createResponse(channel, message)); } catch (CoderException e) { logger.warn("Unable to encode publish message", e); return null; @@ -83,6 +82,7 @@ public abstract class AbstractSubscription implements Subscription { try { channelFuture.get(); } catch (InterruptedException | ExecutionException e) { + logger.warn("Unable to write to channel", e); return false; } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java index b64f132..44357e8 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java @@ -16,6 +16,9 @@ package org.apache.geode.redis.internal; +import java.util.Arrays; +import java.util.List; + /** * This class represents a single channel subscription as created by the SUBSCRIBE command */ @@ -32,6 +35,11 @@ class ChannelSubscription extends AbstractSubscription { } @Override + public List<String> createResponse(String channel, String message) { + return Arrays.asList("message", channel, message); + } + + @Override public boolean isEqualTo(Object channelOrPattern, Client client) { return this.channel != null && this.channel.equals(channelOrPattern) && this.getClient().equals(client); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java index 99ec254..639ae4e 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.DecoderException; -import io.netty.util.concurrent.EventExecutor; import org.apache.logging.log4j.Logger; import org.apache.geode.cache.Cache; @@ -66,8 +65,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { private final GeodeRedisServer server; private final Channel channel; private final AtomicBoolean needChannelFlush; - private final Runnable flusher; - private final EventExecutor lastExecutor; private final ByteBufAllocator byteBufAllocator; /** * TransactionId for any transactions started by this client @@ -114,15 +111,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { this.server = server; this.channel = ch; this.needChannelFlush = new AtomicBoolean(false); - this.flusher = new Runnable() { - - @Override - public void run() { - flushChannel(); - } - - }; - this.lastExecutor = channel.pipeline().lastContext().executor(); this.byteBufAllocator = channel.alloc(); this.transactionID = null; this.transactionQueue = null; // Lazy @@ -138,13 +126,7 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter { } public ChannelFuture writeToChannel(ByteBuf message) { - ChannelFuture channelFuture = channel.write(message, channel.newPromise()); - - if (!needChannelFlush.getAndSet(true)) { - this.lastExecutor.execute(flusher); - } - - return channelFuture; + return channel.writeAndFlush(message, channel.newPromise()); } /** diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java index 212eeaa..6284449 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java @@ -16,6 +16,9 @@ package org.apache.geode.redis.internal; +import java.util.Arrays; +import java.util.List; + import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern; /** @@ -34,6 +37,11 @@ class PatternSubscription extends AbstractSubscription { } @Override + public List<String> createResponse(String channel, String message) { + return Arrays.asList("pmessage", pattern.globPattern(), channel, message); + } + + @Override public boolean isEqualTo(Object channelOrPattern, Client client) { return this.pattern != null && this.pattern.equals(channelOrPattern) && this.getClient().equals(client); diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java index 46be25a..b1b64e9 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java @@ -16,6 +16,9 @@ package org.apache.geode.redis.internal; +import java.util.List; + + /** * Interface that represents the relationship between a channel or pattern and client. */ @@ -40,4 +43,9 @@ public interface Subscription { * Verifies that the subscription channel or pattern matches the designated channel. */ boolean matches(String channel); + + /** + * The response dependent on the type of the subscription + */ + List<String> createResponse(String channel, String message); } diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java index 7cedb5c..349775e 100644 --- a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java +++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java @@ -24,7 +24,8 @@ import java.util.regex.PatternSyntaxException; */ public class GlobPattern { private static final char BACKSLASH = '\\'; - private Pattern compiled; + private final Pattern compiled; + private final String globPattern; private boolean hasWildcard = false; /** @@ -33,7 +34,8 @@ public class GlobPattern { * @param globPattern the glob pattern string */ public GlobPattern(String globPattern) { - set(globPattern); + this.compiled = createPattern(globPattern); + this.globPattern = globPattern; } /** @@ -44,6 +46,13 @@ public class GlobPattern { } /** + * @return the original glob pattern + */ + public String globPattern() { + return globPattern; + } + + /** * Compile glob pattern string * * @param globPattern the glob pattern @@ -68,7 +77,7 @@ public class GlobPattern { * * @param glob the glob pattern string */ - public void set(String glob) { + private Pattern createPattern(String glob) { StringBuilder regex = new StringBuilder(); int setOpen = 0; int curlyOpen = 0; @@ -150,7 +159,7 @@ public class GlobPattern { if (curlyOpen > 0) { error("Unclosed group", glob, len); } - compiled = Pattern.compile(regex.toString()); + return Pattern.compile(regex.toString()); } /**