This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch BEAM-9403-redisio-readall-backup in repository https://gitbox.apache.org/repos/asf/beam.git
commit 98639d1685332700db7f163e614d63f02c955beb Author: Ismaël Mejía <[email protected]> AuthorDate: Wed Apr 15 19:49:15 2020 +0200 --wip-- [skip ci] --- .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 85 +++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java index 2757f2e..0c90ad8 100644 --- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java +++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java @@ -131,6 +131,11 @@ public class RedisIO { .build(); } + /** Like {@link #read()} but executes multiple reads from a a {@link PCollection} as Reads. */ + public static ReadAll readAll() { + return new ReadAll(); + } + /** Write data to a Redis server. */ public static Write write() { return new AutoValue_RedisIO_Write.Builder() @@ -315,12 +320,90 @@ public class RedisIO { } } + /** Implementation of {@link #readKeyPatterns()}. */ + public abstract static class ReadAll + extends PTransform<PCollection<Read>, PCollection<KV<String, String>>> { + @Override + public PCollection<KV<String, String>> expand(PCollection<Read> input) { + return input + // .apply("Split", ParDo.of(new SplitFn())) + .apply("Reshuffle", Reshuffle.viaRandomKey()) + .apply("Read", ParDo.of(new ReadFn())); + } + } + + private static class ReadFn extends DoFn<Read, KV<String, String>> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + Read spec = c.element(); + Jedis jedis = spec.connectionConfiguration().connect(); + + Multimap<BoundedWindow, String> bundles = ArrayListMultimap.create(); + AtomicInteger batchCount = new AtomicInteger(); + + ScanParams scanParams = new ScanParams(); + scanParams.match(spec.keyPattern()); + String cursor = ScanParams.SCAN_POINTER_START; + boolean finished = false; + while (!finished) { + ScanResult<String> scanResult = jedis.scan(cursor, scanParams); + List<String> keys = scanResult.getResult(); + for (String key : keys) { +// c.output(k); +// String key = c.element(); + + bundles.put(window, key); + if (batchCount.incrementAndGet() > spec.batchSize()) { + Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush(); + for (BoundedWindow w : kvs.keySet()) { + for (KV<String, String> kv : kvs.get(w)) { + c.output(kv); + } + } + } + + } + cursor = scanResult.getCursor(); + if (cursor.equals(ScanParams.SCAN_POINTER_START)) { + finished = true; + } + } + + bundles.put(window, key); + if (batchCount.incrementAndGet() > getBatchSize()) { + Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush(); + for (BoundedWindow w : kvs.keySet()) { + for (KV<String, String> kv : kvs.get(w)) { + c.output(kv); + } + } + } + } + + private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() { + Multimap<BoundedWindow, KV<String, String>> kvs = ArrayListMultimap.create(); + for (BoundedWindow w : bundles.keySet()) { + String[] keys = new String[bundles.get(w).size()]; + bundles.get(w).toArray(keys); + List<String> results = jedis.mget(keys); + for (int i = 0; i < results.size(); i++) { + if (results.get(i) != null) { + kvs.put(w, KV.of(keys[i], results.get(i))); + } + } + } + bundles = ArrayListMultimap.create(); + batchCount.set(0); + return kvs; + } + } + private abstract static class BaseReadFn<T> extends DoFn<String, T> { protected final RedisConnectionConfiguration connectionConfiguration; transient Jedis jedis; - BaseReadFn(RedisConnectionConfiguration connectionConfiguration) { + BaseReadFn() { this.connectionConfiguration = connectionConfiguration; }
