This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e1f02622f36 fixed redis tests with testcontainers (#38157)
e1f02622f36 is described below
commit e1f02622f364005de4236f8024355fc4236c6e97
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Mon Apr 13 16:49:02 2026 +0200
fixed redis tests with testcontainers (#38157)
---
sdks/java/io/redis/build.gradle | 2 +-
.../org/apache/beam/sdk/io/redis/RedisIOTest.java | 60 ++++++++++++----------
2 files changed, 34 insertions(+), 28 deletions(-)
diff --git a/sdks/java/io/redis/build.gradle b/sdks/java/io/redis/build.gradle
index 28c03d534cf..907a879a28b 100644
--- a/sdks/java/io/redis/build.gradle
+++ b/sdks/java/io/redis/build.gradle
@@ -28,7 +28,7 @@ dependencies {
implementation "redis.clients:jedis:4.0.1"
testImplementation project(path: ":sdks:java:io:common")
testImplementation library.java.junit
- testImplementation "org.signal:embedded-redis:0.8.2"
+ testImplementation library.java.testcontainers_base
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
}
diff --git
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 5754de89ec3..7b305d846c7 100644
---
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -36,7 +36,6 @@ import java.util.stream.Stream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
import org.apache.beam.sdk.testing.PAssert;
@@ -48,41 +47,49 @@ import org.apache.beam.sdk.values.PCollection;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.resps.StreamEntry;
-import redis.embedded.RedisServer;
/** Test on the Redis IO. */
@RunWith(JUnit4.class)
public class RedisIOTest {
- private static final String REDIS_HOST = "localhost";
+ private static final DockerImageName REDIS_IMAGE_NAME =
DockerImageName.parse("redis:7-alpine");
private static final Long NO_EXPIRATION = -1L;
+ private static final int REDIS_PORT = 6379;
@Rule public TestPipeline p = TestPipeline.create();
- private static RedisServer server;
+ @ClassRule
+ public static final GenericContainer<?> REDIS_CONTAINER =
+ new GenericContainer<>(REDIS_IMAGE_NAME).withExposedPorts(REDIS_PORT);
+
+ private static String redisHost;
private static int port;
private static Jedis client;
@BeforeClass
public static void beforeClass() throws Exception {
- port = NetworkTestHelper.getAvailableLocalPort();
- server = new RedisServer(port);
- server.start();
- client = RedisConnectionConfiguration.create(REDIS_HOST, port).connect();
+ redisHost = REDIS_CONTAINER.getHost();
+ port = REDIS_CONTAINER.getMappedPort(REDIS_PORT);
+ client = RedisConnectionConfiguration.create(redisHost, port).connect();
+ client.ping();
}
@AfterClass
public static void afterClass() {
- client.close();
- server.stop();
+ if (client != null) {
+ client.close();
+ }
}
@Test
@@ -94,7 +101,7 @@ public class RedisIOTest {
p.apply(
"Read",
RedisIO.read()
- .withEndpoint(REDIS_HOST, port)
+ .withEndpoint(redisHost, port)
.withKeyPattern("bulkread*")
.withBatchSize(10));
PAssert.that(read).containsInAnyOrder(data);
@@ -110,7 +117,7 @@ public class RedisIOTest {
p.apply(
"Read",
RedisIO.read()
- .withEndpoint(REDIS_HOST, port)
+ .withEndpoint(redisHost, port)
.withKeyPattern("bigset*")
.withBatchSize(8));
PAssert.that(read).containsInAnyOrder(data);
@@ -126,7 +133,7 @@ public class RedisIOTest {
p.apply(
"Read",
RedisIO.read()
- .withEndpoint(REDIS_HOST, port)
+ .withEndpoint(redisHost, port)
.withKeyPattern("smallset*")
.withBatchSize(20));
PAssert.that(read).containsInAnyOrder(data);
@@ -139,13 +146,12 @@ public class RedisIOTest {
data.forEach(kv -> client.set(kv.getKey(), kv.getValue()));
PCollection<KV<String, String>> read =
- p.apply("Read", RedisIO.read().withEndpoint(REDIS_HOST,
port).withKeyPattern("pattern*"));
+ p.apply("Read", RedisIO.read().withEndpoint(redisHost,
port).withKeyPattern("pattern*"));
PAssert.that(read).containsInAnyOrder(data);
PCollection<KV<String, String>> readNotMatch =
p.apply(
- "ReadNotMatch",
- RedisIO.read().withEndpoint(REDIS_HOST,
port).withKeyPattern("foobar*"));
+ "ReadNotMatch", RedisIO.read().withEndpoint(redisHost,
port).withKeyPattern("foobar*"));
PAssert.thatSingleton(readNotMatch.apply(Count.globally())).isEqualTo(0L);
p.run();
@@ -158,7 +164,7 @@ public class RedisIOTest {
String newValue = "newValue";
PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key,
newValue)));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.SET));
+ write.apply(RedisIO.write().withEndpoint(redisHost,
port).withMethod(Method.SET));
p.run();
assertEquals(newValue, client.get(key));
@@ -174,7 +180,7 @@ public class RedisIOTest {
PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key,
newValue)));
write.apply(
RedisIO.write()
- .withEndpoint(REDIS_HOST, port)
+ .withEndpoint(redisHost, port)
.withMethod(Method.SET)
.withExpireTime(10_000L));
p.run();
@@ -193,7 +199,7 @@ public class RedisIOTest {
String newValue = "newValue";
PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key,
newValue)));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.LPUSH));
+ write.apply(RedisIO.write().withEndpoint(redisHost,
port).withMethod(Method.LPUSH));
p.run();
List<String> values = client.lrange(key, 0, -1);
@@ -208,7 +214,7 @@ public class RedisIOTest {
String newValue = "newValue";
PCollection<KV<String, String>> write = p.apply(Create.of(KV.of(key,
newValue)));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.RPUSH));
+ write.apply(RedisIO.write().withEndpoint(redisHost,
port).withMethod(Method.RPUSH));
p.run();
List<String> values = client.lrange(key, 0, -1);
@@ -222,7 +228,7 @@ public class RedisIOTest {
List<KV<String, String>> data = buildConstantKeyList(key, values);
PCollection<KV<String, String>> write = p.apply(Create.of(data));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.SADD));
+ write.apply(RedisIO.write().withEndpoint(redisHost,
port).withMethod(Method.SADD));
p.run();
Set<String> expected = new HashSet<>(values);
@@ -237,7 +243,7 @@ public class RedisIOTest {
List<KV<String, String>> data = buildConstantKeyList(key, values);
PCollection<KV<String, String>> write = p.apply(Create.of(data));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.PFADD));
+ write.apply(RedisIO.write().withEndpoint(redisHost,
port).withMethod(Method.PFADD));
p.run();
long count = client.pfcount(key);
@@ -254,7 +260,7 @@ public class RedisIOTest {
PCollection<KV<String, String>> write = p.apply(Create.of(data));
write.apply(
RedisIO.write()
- .withEndpoint(REDIS_HOST, port)
+ .withEndpoint(redisHost, port)
.withMethod(Method.PFADD)
.withExpireTime(10_000L));
p.run();
@@ -273,7 +279,7 @@ public class RedisIOTest {
List<KV<String, String>> data = buildConstantKeyList(key, values);
PCollection<KV<String, String>> write = p.apply(Create.of(data));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.INCRBY));
+ write.apply(RedisIO.write().withEndpoint(redisHost,
port).withMethod(Method.INCRBY));
p.run();
@@ -289,7 +295,7 @@ public class RedisIOTest {
List<KV<String, String>> data = buildConstantKeyList(key, values);
PCollection<KV<String, String>> write = p.apply(Create.of(data));
- write.apply(RedisIO.write().withEndpoint(REDIS_HOST,
port).withMethod(Method.DECRBY));
+ write.apply(RedisIO.write().withEndpoint(redisHost,
port).withMethod(Method.DECRBY));
p.run();
@@ -319,7 +325,7 @@ public class RedisIOTest {
KvCoder.of(
StringUtf8Coder.of(),
MapCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of()))));
- write.apply(RedisIO.writeStreams().withEndpoint(REDIS_HOST, port));
+ write.apply(RedisIO.writeStreams().withEndpoint(redisHost, port));
p.run();
for (String key : redisKeys) {
@@ -353,7 +359,7 @@ public class RedisIOTest {
MapCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of()))));
write.apply(
RedisIO.writeStreams()
- .withEndpoint(REDIS_HOST, port)
+ .withEndpoint(redisHost, port)
.withMaxLen(1)
.withApproximateTrim(false));
p.run();