This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e1f02622f36 fixed redis tests with testcontainers (#38157)
e1f02622f36 is described below

commit e1f02622f364005de4236f8024355fc4236c6e97
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Mon Apr 13 16:49:02 2026 +0200

    fixed redis tests with testcontainers (#38157)
---
 sdks/java/io/redis/build.gradle                    |  2 +-
 .../org/apache/beam/sdk/io/redis/RedisIOTest.java  | 60 ++++++++++++----------
 2 files changed, 34 insertions(+), 28 deletions(-)

diff --git a/sdks/java/io/redis/build.gradle b/sdks/java/io/redis/build.gradle
index 28c03d534cf..907a879a28b 100644
--- a/sdks/java/io/redis/build.gradle
+++ b/sdks/java/io/redis/build.gradle
@@ -28,7 +28,7 @@ dependencies {
   implementation "redis.clients:jedis:4.0.1"
   testImplementation project(path: ":sdks:java:io:common")
   testImplementation library.java.junit
-  testImplementation "org.signal:embedded-redis:0.8.2"
+  testImplementation library.java.testcontainers_base
   testRuntimeOnly library.java.slf4j_jdk14
   testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
 }
diff --git 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 5754de89ec3..7b305d846c7 100644
--- 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -36,7 +36,6 @@ import java.util.stream.Stream;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.common.NetworkTestHelper;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
 import org.apache.beam.sdk.testing.PAssert;
@@ -48,41 +47,49 @@ import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.StreamEntryID;
 import redis.clients.jedis.resps.StreamEntry;
-import redis.embedded.RedisServer;
 
 /** Test on the Redis IO. */
 @RunWith(JUnit4.class)
 public class RedisIOTest {
 
-  private static final String REDIS_HOST = "localhost";
+  private static final DockerImageName REDIS_IMAGE_NAME = 
DockerImageName.parse("redis:7-alpine");
   private static final Long NO_EXPIRATION = -1L;
+  private static final int REDIS_PORT = 6379;
 
   @Rule public TestPipeline p = TestPipeline.create();
 
-  private static RedisServer server;
+  @ClassRule
+  public static final GenericContainer<?> REDIS_CONTAINER =
+      new GenericContainer<>(REDIS_IMAGE_NAME).withExposedPorts(REDIS_PORT);
+
+  private static String redisHost;
   private static int port;
 
   private static Jedis client;
 
   @BeforeClass
   public static void beforeClass() throws Exception {
-    port = NetworkTestHelper.getAvailableLocalPort();
-    server = new RedisServer(port);
-    server.start();
-    client = RedisConnectionConfiguration.create(REDIS_HOST, port).connect();
+    redisHost = REDIS_CONTAINER.getHost();
+    port = REDIS_CONTAINER.getMappedPort(REDIS_PORT);
+    client = RedisConnectionConfiguration.create(redisHost, port).connect();
+    client.ping();
   }
 
   @AfterClass
   public static void afterClass() {
-    client.close();
-    server.stop();
+    if (client != null) {
+      client.close();
+    }
   }
 
   @Test
@@ -94,7 +101,7 @@ public class RedisIOTest {
         p.apply(
             "Read",
             RedisIO.read()
-                .withEndpoint(REDIS_HOST, port)
+                .withEndpoint(redisHost, port)
                 .withKeyPattern("bulkread*")
                 .withBatchSize(10));
     PAssert.that(read).containsInAnyOrder(data);
@@ -110,7 +117,7 @@ public class RedisIOTest {
         p.apply(
             "Read",
             RedisIO.read()
-                .withEndpoint(REDIS_HOST, port)
+                .withEndpoint(redisHost, port)
                 .withKeyPattern("bigset*")
                 .withBatchSize(8));
     PAssert.that(read).containsInAnyOrder(data);
@@ -126,7 +133,7 @@ public class RedisIOTest {
         p.apply(
             "Read",
             RedisIO.read()
-                .withEndpoint(REDIS_HOST, port)
+                .withEndpoint(redisHost, port)
                 .withKeyPattern("smallset*")
                 .withBatchSize(20));
     PAssert.that(read).containsInAnyOrder(data);
@@ -139,13 +146,12 @@ public class RedisIOTest {
     data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
 
     PCollection<KV<String, String>> read =
-        p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST, 
port).withKeyPattern("pattern*"));
+        p.apply("Read", RedisIO.read().withEndpoint(redisHost, 
port).withKeyPattern("pattern*"));
     PAssert.that(read).containsInAnyOrder(data);
 
     PCollection<KV<String, String>> readNotMatch =
         p.apply(
-            "ReadNotMatch",
-            RedisIO.read().withEndpoint(REDIS_HOST, 
port).withKeyPattern("foobar*"));
+            "ReadNotMatch", RedisIO.read().withEndpoint(redisHost, 
port).withKeyPattern("foobar*"));
     PAssert.thatSingleton(readNotMatch.apply(Count.globally())).isEqualTo(0L);
 
     p.run();
@@ -158,7 +164,7 @@ public class RedisIOTest {
 
     String newValue = "newValue";
     PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, 
newValue)));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.SET));
+    write.apply(RedisIO.write().withEndpoint(redisHost, 
port).withMethod(Method.SET));
     p.run();
 
     assertEquals(newValue, client.get(key));
@@ -174,7 +180,7 @@ public class RedisIOTest {
     PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, 
newValue)));
     write.apply(
         RedisIO.write()
-            .withEndpoint(REDIS_HOST, port)
+            .withEndpoint(redisHost, port)
             .withMethod(Method.SET)
             .withExpireTime(10_000L));
     p.run();
@@ -193,7 +199,7 @@ public class RedisIOTest {
 
     String newValue = "newValue";
     PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, 
newValue)));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.LPUSH));
+    write.apply(RedisIO.write().withEndpoint(redisHost, 
port).withMethod(Method.LPUSH));
     p.run();
 
     List<String> values = client.lrange(key, 0, -1);
@@ -208,7 +214,7 @@ public class RedisIOTest {
 
     String newValue = "newValue";
     PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key, 
newValue)));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.RPUSH));
+    write.apply(RedisIO.write().withEndpoint(redisHost, 
port).withMethod(Method.RPUSH));
     p.run();
 
     List<String> values = client.lrange(key, 0, -1);
@@ -222,7 +228,7 @@ public class RedisIOTest {
     List<KV<String, String>> data = buildConstantKeyList(key, values);
 
     PCollection<KV<String, String>> write = p.apply(Create.of(data));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.SADD));
+    write.apply(RedisIO.write().withEndpoint(redisHost, 
port).withMethod(Method.SADD));
     p.run();
 
     Set<String> expected = new HashSet<>(values);
@@ -237,7 +243,7 @@ public class RedisIOTest {
     List<KV<String, String>> data = buildConstantKeyList(key, values);
 
     PCollection<KV<String, String>> write = p.apply(Create.of(data));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.PFADD));
+    write.apply(RedisIO.write().withEndpoint(redisHost, 
port).withMethod(Method.PFADD));
     p.run();
 
     long count = client.pfcount(key);
@@ -254,7 +260,7 @@ public class RedisIOTest {
     PCollection<KV<String, String>> write = p.apply(Create.of(data));
     write.apply(
         RedisIO.write()
-            .withEndpoint(REDIS_HOST, port)
+            .withEndpoint(redisHost, port)
             .withMethod(Method.PFADD)
             .withExpireTime(10_000L));
     p.run();
@@ -273,7 +279,7 @@ public class RedisIOTest {
     List<KV<String, String>> data = buildConstantKeyList(key, values);
 
     PCollection<KV<String, String>> write = p.apply(Create.of(data));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.INCRBY));
+    write.apply(RedisIO.write().withEndpoint(redisHost, 
port).withMethod(Method.INCRBY));
 
     p.run();
 
@@ -289,7 +295,7 @@ public class RedisIOTest {
     List<KV<String, String>> data = buildConstantKeyList(key, values);
 
     PCollection<KV<String, String>> write = p.apply(Create.of(data));
-    write.apply(RedisIO.write().withEndpoint(REDIS_HOST, 
port).withMethod(Method.DECRBY));
+    write.apply(RedisIO.write().withEndpoint(redisHost, 
port).withMethod(Method.DECRBY));
 
     p.run();
 
@@ -319,7 +325,7 @@ public class RedisIOTest {
                     KvCoder.of(
                         StringUtf8Coder.of(),
                         MapCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))));
-    write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+    write.apply(RedisIO.writeStreams().withEndpoint(redisHost, port));
     p.run();
 
     for (String key : redisKeys) {
@@ -353,7 +359,7 @@ public class RedisIOTest {
                         MapCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))));
     write.apply(
         RedisIO.writeStreams()
-            .withEndpoint(REDIS_HOST, port)
+            .withEndpoint(redisHost, port)
             .withMaxLen(1)
             .withApproximateTrim(false));
     p.run();

Reply via email to