This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch 
GEODE-7513-PersistentColocatedPartitionedRegionDistributedTest-debugging
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a3e084d8ae9895accfe3c55adf33a006cb01daf2
Author: Jens Deppe <jde...@pivotal.io>
AuthorDate: Tue Feb 25 13:01:54 2020 -0800

    GEODE-7798: Fix flakiness in PubSubTest (#4717)
    
    - Remove flusher execution in favor of just calling writeAndFlush
    - Bump Netty to 4.1.45
    - Correct message depending on type of subscription processing a PUBLISH
---
 .../src/test/resources/expected-pom.xml            |  2 +-
 .../gradle/plugins/DependencyConstraints.groovy    |  2 +-
 .../integrationTest/resources/assembly_content.txt |  2 +-
 .../resources/dependency_classpath.txt             |  2 +-
 geode-redis/build.gradle                           | 19 +++++++-----
 .../java/org/apache/geode/redis/PubSubTest.java    | 34 ++++++++++------------
 .../apache/geode/redis/mocks/MockSubscriber.java   |  7 ++++-
 .../geode/redis/internal/AbstractSubscription.java |  4 +--
 .../geode/redis/internal/ChannelSubscription.java  |  8 +++++
 .../redis/internal/ExecutionHandlerContext.java    | 20 +------------
 .../geode/redis/internal/PatternSubscription.java  |  8 +++++
 .../apache/geode/redis/internal/Subscription.java  |  8 +++++
 .../internal/org/apache/hadoop/fs/GlobPattern.java | 17 ++++++++---
 13 files changed, 78 insertions(+), 55 deletions(-)

diff --git a/boms/geode-all-bom/src/test/resources/expected-pom.xml 
b/boms/geode-all-bom/src/test/resources/expected-pom.xml
index 434044f..7df23cd 100644
--- a/boms/geode-all-bom/src/test/resources/expected-pom.xml
+++ b/boms/geode-all-bom/src/test/resources/expected-pom.xml
@@ -214,7 +214,7 @@
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty-all</artifactId>
-        <version>4.1.42.Final</version>
+        <version>4.1.45.Final</version>
         <scope>compile</scope>
       </dependency>
       <dependency>
diff --git 
a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
 
b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
index e36a2f2..2c72c75 100644
--- 
a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
+++ 
b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
@@ -113,7 +113,7 @@ class DependencyConstraints implements Plugin<Project> {
         api(group: 'commons-validator', name: 'commons-validator', version: 
get('commons-validator.version'))
         api(group: 'io.github.classgraph', name: 'classgraph', version: 
'4.8.52')
         api(group: 'io.micrometer', name: 'micrometer-core', version: 
get('micrometer.version'))
-        api(group: 'io.netty', name: 'netty-all', version: '4.1.42.Final')
+        api(group: 'io.netty', name: 'netty-all', version: '4.1.45.Final')
         api(group: 'it.unimi.dsi', name: 'fastutil', version: 
get('fastutil.version'))
         api(group: 'javax.annotation', name: 'javax.annotation-api', version: 
'1.3.2')
         api(group: 'javax.annotation', name: 'jsr250-api', version: '1.0')
diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt 
b/geode-assembly/src/integrationTest/resources/assembly_content.txt
index 8b3b268..a34c1c3 100644
--- a/geode-assembly/src/integrationTest/resources/assembly_content.txt
+++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt
@@ -1040,7 +1040,7 @@ lib/micrometer-core-1.2.1.jar
 lib/mx4j-3.0.2.jar
 lib/mx4j-remote-3.0.2.jar
 lib/mx4j-tools-3.0.1.jar
-lib/netty-all-4.1.42.Final.jar
+lib/netty-all-4.1.45.Final.jar
 lib/protobuf-java-3.10.0.jar
 lib/ra.jar
 lib/rmiio-2.1.2.jar
diff --git 
a/geode-assembly/src/integrationTest/resources/dependency_classpath.txt 
b/geode-assembly/src/integrationTest/resources/dependency_classpath.txt
index 95af982..b4785cc 100644
--- a/geode-assembly/src/integrationTest/resources/dependency_classpath.txt
+++ b/geode-assembly/src/integrationTest/resources/dependency_classpath.txt
@@ -87,6 +87,6 @@ lucene-core-6.6.6.jar
 lucene-queries-6.6.6.jar
 protobuf-java-3.10.0.jar
 geo-0.7.1.jar
-netty-all-4.1.42.Final.jar
+netty-all-4.1.45.Final.jar
 grumpy-core-0.2.2.jar
 commons-math3-3.2.jar
diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle
index a51a3b4..b107fe9 100644
--- a/geode-redis/build.gradle
+++ b/geode-redis/build.gradle
@@ -21,17 +21,22 @@ apply from: 
"${project.projectDir}/../gradle/publish-java.gradle"
 
 dependencies {
   compile(platform(project(':boms:geode-all-bom')))
-  implementation(project(':geode-serialization'))
-  implementation(project(':geode-logging'))
   compile(project(':geode-core'))
+  compile(project(':geode-gfsh'))
   compile('com.github.davidmoten:geo')
   compile('io.netty:netty-all')
   compile('org.apache.logging.log4j:log4j-api')
-  compile(project(':geode-gfsh'))
-  distributedTestCompile(project(':geode-dunit'))
-  integrationTestCompile('redis.clients:jedis')
-  distributedTestCompile('redis.clients:jedis')
-  testCompile('org.mockito:mockito-core')
+  implementation(project(':geode-logging'))
+  implementation(project(':geode-serialization'))
+
   testCompile(project(':geode-junit'))
+  testCompile('org.mockito:mockito-core')
+
   integrationTestCompile(project(':geode-junit'))
+  integrationTestCompile('redis.clients:jedis')
+
+  integrationTestRuntime(project(':geode-log4j'))
+
+  distributedTestCompile(project(':geode-dunit'))
+  distributedTestCompile('redis.clients:jedis')
 }
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
index 68d43f0..924b2be 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/PubSubTest.java
@@ -18,19 +18,15 @@ package org.apache.geode.redis;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import redis.clients.jedis.Jedis;
@@ -39,32 +35,32 @@ import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.redis.mocks.MockSubscriber;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.junit.categories.RedisTest;
 
 @Category({RedisTest.class})
-@Ignore("GEODE-7798")
 public class PubSubTest {
+  private static final int REDIS_CLIENT_TIMEOUT = 100000;
   private static GeodeRedisServer server;
   private static GemFireCache cache;
-  private static Random rand;
   private static Jedis publisher;
   private static Jedis subscriber;
   private static int port = 6379;
 
   @BeforeClass
   public static void setUp() {
-    rand = new Random();
     CacheFactory cf = new CacheFactory();
-    cf.set(LOG_LEVEL, "error");
+    cf.set(LOG_LEVEL, "warn");
     cf.set(MCAST_PORT, "0");
     cf.set(LOCATORS, "");
     cache = cf.create();
+
     port = AvailablePortHelper.getRandomAvailableTCPPort();
     server = new GeodeRedisServer("localhost", port);
-    subscriber = new Jedis("localhost", port, 100000);
-    publisher = new Jedis("localhost", port, 100000);
-
     server.start();
+
+    subscriber = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT);
+    publisher = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT);
   }
 
   @AfterClass
@@ -127,7 +123,7 @@ public class PubSubTest {
 
   @Test
   public void testTwoSubscribersOneChannel() {
-    Jedis subscriber2 = new Jedis("localhost", port);
+    Jedis subscriber2 = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT);
     MockSubscriber mockSubscriber1 = new MockSubscriber();
     MockSubscriber mockSubscriber2 = new MockSubscriber();
 
@@ -189,7 +185,7 @@ public class PubSubTest {
 
   @Test
   public void testDeadSubscriber() {
-    Jedis deadSubscriber = new Jedis("localhost", port);
+    Jedis deadSubscriber = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT);
 
     MockSubscriber mockSubscriber = new MockSubscriber();
 
@@ -226,8 +222,8 @@ public class PubSubTest {
     Long result = publisher.publish("salutations", "hello");
     assertThat(result).isEqualTo(1);
 
-    assertThat(mockSubscriber.getReceivedMessages()).hasSize(1);
-    assertThat(mockSubscriber.getReceivedMessages()).contains("hello");
+    assertThat(mockSubscriber.getReceivedMessages()).isEmpty();
+    assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
 
     mockSubscriber.punsubscribe("sal*s");
     waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
@@ -260,7 +256,8 @@ public class PubSubTest {
 
     waitFor(() -> !subscriberThread.isAlive());
 
-    assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello", 
"hello");
+    assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello");
+    assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
   }
 
   @Test
@@ -289,11 +286,12 @@ public class PubSubTest {
 
     waitFor(() -> !subscriberThread.isAlive());
 
-    assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello", 
"hello");
+    assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello");
+    assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
   }
 
   private void waitFor(Callable<Boolean> booleanCallable) {
-    await().atMost(1, TimeUnit.SECONDS)
+    GeodeAwaitility.await()
         .ignoreExceptions() // ignoring socket closed exceptions
         .until(booleanCallable);
   }
diff --git 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index a1e9d41..b0d5326 100644
--- 
a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++ 
b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -23,11 +23,16 @@ import redis.clients.jedis.JedisPubSub;
 
 public class MockSubscriber extends JedisPubSub {
   private List<String> receivedMessages = new ArrayList<>();
+  private List<String> receivedPMessages = new ArrayList<>();
 
   public List<String> getReceivedMessages() {
     return receivedMessages;
   }
 
+  public List<String> getReceivedPMessages() {
+    return receivedPMessages;
+  }
+
   @Override
   public void onMessage(String channel, String message) {
     receivedMessages.add(message);
@@ -35,6 +40,6 @@ public class MockSubscriber extends JedisPubSub {
 
   @Override
   public void onPMessage(String pattern, String channel, String message) {
-    receivedMessages.add(message);
+    receivedPMessages.add(message);
   }
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
index c8514d5..2164eb9 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/AbstractSubscription.java
@@ -16,7 +16,6 @@
 
 package org.apache.geode.redis.internal;
 
-import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 
 import io.netty.buffer.ByteBuf;
@@ -64,7 +63,7 @@ public abstract class AbstractSubscription implements 
Subscription {
     ByteBuf messageByteBuffer;
     try {
       messageByteBuffer = Coder.getArrayResponse(context.getByteBufAllocator(),
-          Arrays.asList("message", channel, message));
+          createResponse(channel, message));
     } catch (CoderException e) {
       logger.warn("Unable to encode publish message", e);
       return null;
@@ -83,6 +82,7 @@ public abstract class AbstractSubscription implements 
Subscription {
     try {
       channelFuture.get();
     } catch (InterruptedException | ExecutionException e) {
+      logger.warn("Unable to write to channel", e);
       return false;
     }
 
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
index b64f132..44357e8 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ChannelSubscription.java
@@ -16,6 +16,9 @@
 
 package org.apache.geode.redis.internal;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * This class represents a single channel subscription as created by the 
SUBSCRIBE command
  */
@@ -32,6 +35,11 @@ class ChannelSubscription extends AbstractSubscription {
   }
 
   @Override
+  public List<String> createResponse(String channel, String message) {
+    return Arrays.asList("message", channel, message);
+  }
+
+  @Override
   public boolean isEqualTo(Object channelOrPattern, Client client) {
     return this.channel != null && this.channel.equals(channelOrPattern)
         && this.getClient().equals(client);
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
index 99ec254..639ae4e 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/ExecutionHandlerContext.java
@@ -26,7 +26,6 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.DecoderException;
-import io.netty.util.concurrent.EventExecutor;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.cache.Cache;
@@ -66,8 +65,6 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
   private final GeodeRedisServer server;
   private final Channel channel;
   private final AtomicBoolean needChannelFlush;
-  private final Runnable flusher;
-  private final EventExecutor lastExecutor;
   private final ByteBufAllocator byteBufAllocator;
   /**
    * TransactionId for any transactions started by this client
@@ -114,15 +111,6 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
     this.server = server;
     this.channel = ch;
     this.needChannelFlush = new AtomicBoolean(false);
-    this.flusher = new Runnable() {
-
-      @Override
-      public void run() {
-        flushChannel();
-      }
-
-    };
-    this.lastExecutor = channel.pipeline().lastContext().executor();
     this.byteBufAllocator = channel.alloc();
     this.transactionID = null;
     this.transactionQueue = null; // Lazy
@@ -138,13 +126,7 @@ public class ExecutionHandlerContext extends 
ChannelInboundHandlerAdapter {
   }
 
   public ChannelFuture writeToChannel(ByteBuf message) {
-    ChannelFuture channelFuture = channel.write(message, channel.newPromise());
-
-    if (!needChannelFlush.getAndSet(true)) {
-      this.lastExecutor.execute(flusher);
-    }
-
-    return channelFuture;
+    return channel.writeAndFlush(message, channel.newPromise());
   }
 
   /**
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
index 212eeaa..6284449 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/PatternSubscription.java
@@ -16,6 +16,9 @@
 
 package org.apache.geode.redis.internal;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.geode.redis.internal.org.apache.hadoop.fs.GlobPattern;
 
 /**
@@ -34,6 +37,11 @@ class PatternSubscription extends AbstractSubscription {
   }
 
   @Override
+  public List<String> createResponse(String channel, String message) {
+    return Arrays.asList("pmessage", pattern.globPattern(), channel, message);
+  }
+
+  @Override
   public boolean isEqualTo(Object channelOrPattern, Client client) {
     return this.pattern != null && this.pattern.equals(channelOrPattern)
         && this.getClient().equals(client);
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
index 46be25a..b1b64e9 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscription.java
@@ -16,6 +16,9 @@
 
 package org.apache.geode.redis.internal;
 
+import java.util.List;
+
+
 /**
  * Interface that represents the relationship between a channel or pattern and 
client.
  */
@@ -40,4 +43,9 @@ public interface Subscription {
    * Verifies that the subscription channel or pattern matches the designated 
channel.
    */
   boolean matches(String channel);
+
+  /**
+   * The response dependent on the type of the subscription
+   */
+  List<String> createResponse(String channel, String message);
 }
diff --git 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
index 7cedb5c..349775e 100644
--- 
a/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
+++ 
b/geode-redis/src/main/java/org/apache/geode/redis/internal/org/apache/hadoop/fs/GlobPattern.java
@@ -24,7 +24,8 @@ import java.util.regex.PatternSyntaxException;
  */
 public class GlobPattern {
   private static final char BACKSLASH = '\\';
-  private Pattern compiled;
+  private final Pattern compiled;
+  private final String globPattern;
   private boolean hasWildcard = false;
 
   /**
@@ -33,7 +34,8 @@ public class GlobPattern {
    * @param globPattern the glob pattern string
    */
   public GlobPattern(String globPattern) {
-    set(globPattern);
+    this.compiled = createPattern(globPattern);
+    this.globPattern = globPattern;
   }
 
   /**
@@ -44,6 +46,13 @@ public class GlobPattern {
   }
 
   /**
+   * @return the original glob pattern
+   */
+  public String globPattern() {
+    return globPattern;
+  }
+
+  /**
    * Compile glob pattern string
    *
    * @param globPattern the glob pattern
@@ -68,7 +77,7 @@ public class GlobPattern {
    *
    * @param glob the glob pattern string
    */
-  public void set(String glob) {
+  private Pattern createPattern(String glob) {
     StringBuilder regex = new StringBuilder();
     int setOpen = 0;
     int curlyOpen = 0;
@@ -150,7 +159,7 @@ public class GlobPattern {
     if (curlyOpen > 0) {
       error("Unclosed group", glob, len);
     }
-    compiled = Pattern.compile(regex.toString());
+    return Pattern.compile(regex.toString());
   }
 
   /**

Reply via email to