pabloem commented on a change in pull request #15549:
URL: https://github.com/apache/beam/pull/15549#discussion_r799887814



##########
File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
##########
@@ -334,95 +337,33 @@ public void setup() {
     public void teardown() {
       jedis.close();
     }
-  }
-
-  private static class ReadKeysWithPattern extends BaseReadFn<String> {
 
-    ReadKeysWithPattern(RedisConnectionConfiguration connectionConfiguration) {
-      super(connectionConfiguration);
+    @GetInitialRestriction
+    public ByteKeyRange getInitialRestriction() {
+      return ByteKeyRange.of(ByteKey.of(0x00), ByteKey.EMPTY);
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
+    public void processElement(
+        ProcessContext c, RestrictionTracker<ByteKeyRange, ByteKey> tracker) {
+      ByteKey cursor = tracker.currentRestriction().getStartKey();
+      RedisCursor redisCursor = RedisCursor.byteKeyToRedisCursor(cursor, 
jedis.dbSize(), true);
       ScanParams scanParams = new ScanParams();
       scanParams.match(c.element());
-
-      String cursor = ScanParams.SCAN_POINTER_START;
-      boolean finished = false;
-      while (!finished) {
-        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
-        List<String> keys = scanResult.getResult();
-        for (String k : keys) {
-          c.output(k);
-        }
-        cursor = scanResult.getCursor();
-        if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
-          finished = true;
-        }
-      }
-    }
-  }
-
-  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
-  private static class ReadFn extends BaseReadFn<KV<String, String>> {
-    transient @Nullable Multimap<BoundedWindow, String> bundles = null;
-    @Nullable AtomicInteger batchCount = null;
-    private final int batchSize;
-
-    ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
-      super(connectionConfiguration);
-      this.batchSize = batchSize;
-    }
-
-    @StartBundle
-    public void startBundle() {
-      bundles = ArrayListMultimap.create();
-      batchCount = new AtomicInteger();
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
-      String key = c.element();
-      bundles.put(window, key);
-      if (batchCount.incrementAndGet() > getBatchSize()) {
-        Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
-        for (BoundedWindow w : kvs.keySet()) {
-          for (KV<String, String> kv : kvs.get(w)) {
-            c.output(kv);
-          }
-        }
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(FinishBundleContext context) {
-      Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
-      for (BoundedWindow w : kvs.keySet()) {
-        for (KV<String, String> kv : kvs.get(w)) {
-          context.output(kv, w.maxTimestamp(), w);
-        }
-      }
-    }
-
-    private int getBatchSize() {
-      return batchSize;
-    }
-
-    private Multimap<BoundedWindow, KV<String, String>> fetchAndFlush() {
-      Multimap<BoundedWindow, KV<String, String>> kvs = 
ArrayListMultimap.create();
-      for (BoundedWindow w : bundles.keySet()) {
-        String[] keys = new String[bundles.get(w).size()];
-        bundles.get(w).toArray(keys);
-        List<String> results = jedis.mget(keys);
-        for (int i = 0; i < results.size(); i++) {
-          if (results.get(i) != null) {
-            kvs.put(w, KV.of(keys[i], results.get(i)));
+      while (tracker.tryClaim(cursor)) {
+        ScanResult<String> scanResult = jedis.scan(redisCursor.getCursor(), 
scanParams);
+        if (scanResult.getResult().size() > 0) {
+          String[] keys = scanResult.getResult().toArray(new 
String[scanResult.getResult().size()]);
+          List<String> results = jedis.mget(keys);
+          for (int i = 0; i < results.size(); i++) {
+            if (results.get(i) != null) {
+              c.output(KV.of(keys[i], results.get(i)));
+            }
           }
         }
+        redisCursor = RedisCursor.of(scanResult.getCursor(), jedis.dbSize(), 
false);

Review comment:
       it seems like we need to end this when `scanResult.getCursor()` returns 
0 again. Is that correct? Is this case handled correctly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to