[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145696=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145696 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 19/Sep/18 15:42 Start Date: 19/Sep/18 15:42 Worklog Time Spent: 10m Work Description: vvarma commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-422852677 @iemejia Thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145696) Time Spent: 6h 20m (was: 6h 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Fix For: 2.8.0 > > Time Spent: 6h 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145693 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 19/Sep/18 15:14 Start Date: 19/Sep/18 15:14 Worklog Time Spent: 10m Work Description: iemejia closed pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java index 279ca46db2b..57d0b77af1f 100644 --- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java +++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java @@ -20,7 +20,10 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.KvCoder; @@ -34,6 +37,7 @@ import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -109,6 +113,7 @@ public static Read read() { return new AutoValue_RedisIO_Read.Builder() .setConnectionConfiguration(RedisConnectionConfiguration.create()) .setKeyPattern("*") +.setBatchSize(1000) .build(); } @@ -119,6 +124,7 @@ public static Read read() { public static ReadAll readAll() { return new AutoValue_RedisIO_ReadAll.Builder() .setConnectionConfiguration(RedisConnectionConfiguration.create()) +.setBatchSize(1000) .build(); } @@ -142,6 +148,8 @@ private RedisIO() {} @Nullable abstract String keyPattern(); +abstract int batchSize(); + abstract Builder builder(); @AutoValue.Builder @@ -152,6 +160,8 @@ private RedisIO() {} @Nullable abstract Builder setKeyPattern(String keyPattern); + abstract Builder setBatchSize(int batchSize); + abstract Read build(); } @@ -185,6 +195,10 @@ public Read withConnectionConfiguration(RedisConnectionConfiguration connection) return builder().setConnectionConfiguration(connection).build(); } +public Read withBatchSize(int batchSize) { + return builder().setBatchSize(batchSize).build(); +} + @Override public void populateDisplayData(DisplayData.Builder builder) { connectionConfiguration().populateDisplayData(builder); @@ -196,7 +210,11 @@ public void populateDisplayData(DisplayData.Builder builder) { return input .apply(Create.of(keyPattern())) - .apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration())); + .apply(ParDo.of(new ReadKeysWithPattern(connectionConfiguration( + .apply( + RedisIO.readAll() + .withConnectionConfiguration(connectionConfiguration()) + .withBatchSize(batchSize())); } } @@ -208,6 +226,8 @@ public void populateDisplayData(DisplayData.Builder builder) { @Nullable abstract RedisConnectionConfiguration connectionConfiguration(); +abstract int batchSize(); + abstract ReadAll.Builder builder(); @AutoValue.Builder @@ -215,6 +235,8 @@ public void populateDisplayData(DisplayData.Builder builder) { @Nullable abstract ReadAll.Builder setConnectionConfiguration(RedisConnectionConfiguration connection); + abstract ReadAll.Builder setBatchSize(int batchSize); + abstract ReadAll build(); } @@ -243,25 +265,27 @@ public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connecti return builder().setConnectionConfiguration(connection).build(); } +public ReadAll withBatchSize(int batchSize) { + return builder().setBatchSize(batchSize).build(); +} + @Override public PCollection> expand(PCollection input) { checkArgument(connectionConfiguration() != null, "withConnectionConfiguration() is required"); return input -
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145667=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145667 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 19/Sep/18 14:08 Start Date: 19/Sep/18 14:08 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r218818530 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -280,28 +316,77 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +@Nullable transient Multimap bundles = null; +@Nullable AtomicInteger batchCount = null; +private final int batchSize; + +@StartBundle +public void startBundle(StartBundleContext context) { + bundles = ArrayListMultimap.create(); + batchCount = new AtomicInteger(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bundles.put(window, key); + if (batchCount.incrementAndGet() > getBatchSize()) { +Multimap> kvs = fetchAndFlush(); +for (BoundedWindow w : kvs.keySet()) { + for (KV kv : kvs.get(w)) { +processContext.output(kv); + } +} + } +} + +private Multimap> fetchAndFlush() { + Multimap> kvs = ArrayListMultimap.create(); + for (BoundedWindow w : bundles.keySet()) { +String[] keys = new String[bundles.get(w).size()]; +bundles.get(w).toArray(keys); +List results = jedis.mget(keys); +for (int i = 0; i < results.size(); i++) { + if (results.get(i) != null) { +kvs.put(w, KV.of(keys[i], results.get(i))); + } +} + } + bundles = ArrayListMultimap.create(); + batchCount.set(0); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + Multimap> kvs = fetchAndFlush(); Review comment: Thanks @iemejia . Do suggest if there any other changes needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145667) Time Spent: 6h (was: 5h 50m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: >
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145638 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 19/Sep/18 12:11 Start Date: 19/Sep/18 12:11 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r218777618 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -280,28 +316,77 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +@Nullable transient Multimap bundles = null; +@Nullable AtomicInteger batchCount = null; +private final int batchSize; + +@StartBundle +public void startBundle(StartBundleContext context) { + bundles = ArrayListMultimap.create(); + batchCount = new AtomicInteger(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bundles.put(window, key); + if (batchCount.incrementAndGet() > getBatchSize()) { +Multimap> kvs = fetchAndFlush(); +for (BoundedWindow w : kvs.keySet()) { + for (KV kv : kvs.get(w)) { +processContext.output(kv); + } +} + } +} + +private Multimap> fetchAndFlush() { + Multimap> kvs = ArrayListMultimap.create(); + for (BoundedWindow w : bundles.keySet()) { +String[] keys = new String[bundles.get(w).size()]; +bundles.get(w).toArray(keys); +List results = jedis.mget(keys); +for (int i = 0; i < results.size(); i++) { + if (results.get(i) != null) { +kvs.put(w, KV.of(keys[i], results.get(i))); + } +} + } + bundles = ArrayListMultimap.create(); + batchCount.set(0); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + Multimap> kvs = fetchAndFlush(); Review comment: Thanks for answering, I have somehow misread the `startBundle` as a setup only method. I see how everything fits now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 145638) Time Spent: 5h 50m (was: 5h 40m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: >
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144321=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144321 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 14/Sep/18 15:30 Start Date: 14/Sep/18 15:30 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r217752660 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -280,28 +316,77 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +@Nullable transient Multimap bundles = null; +@Nullable AtomicInteger batchCount = null; +private final int batchSize; + +@StartBundle +public void startBundle(StartBundleContext context) { + bundles = ArrayListMultimap.create(); + batchCount = new AtomicInteger(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bundles.put(window, key); + if (batchCount.incrementAndGet() > getBatchSize()) { +Multimap> kvs = fetchAndFlush(); +for (BoundedWindow w : kvs.keySet()) { + for (KV kv : kvs.get(w)) { +processContext.output(kv); + } +} + } +} + +private Multimap> fetchAndFlush() { + Multimap> kvs = ArrayListMultimap.create(); + for (BoundedWindow w : bundles.keySet()) { +String[] keys = new String[bundles.get(w).size()]; +bundles.get(w).toArray(keys); +List results = jedis.mget(keys); +for (int i = 0; i < results.size(); i++) { + if (results.get(i) != null) { +kvs.put(w, KV.of(keys[i], results.get(i))); + } +} + } + bundles = ArrayListMultimap.create(); + batchCount.set(0); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + Multimap> kvs = fetchAndFlush(); Review comment: the reason for this extra flush is because we have a batch size. Once the number of messages reaches this value, we invoke flush. at the end of the window when finishbundle is invoked, there may be few messages left in the buffer (less than batch size). So we invoke flush from here as well. And for this reason we need to store the window of the message as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144321) Time Spent: 5h 40m (was: 5.5h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144320 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 14/Sep/18 15:27 Start Date: 14/Sep/18 15:27 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r217751824 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -280,28 +316,77 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +@Nullable transient Multimap bundles = null; +@Nullable AtomicInteger batchCount = null; +private final int batchSize; + +@StartBundle +public void startBundle(StartBundleContext context) { + bundles = ArrayListMultimap.create(); + batchCount = new AtomicInteger(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bundles.put(window, key); + if (batchCount.incrementAndGet() > getBatchSize()) { +Multimap> kvs = fetchAndFlush(); Review comment: the window stored in the key here is used in FinishBundle to output the keys, since the the context in FinishBundle takes window as a parameter. `context.output(kv, w.maxTimestamp(), w);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144320) Time Spent: 5.5h (was: 5h 20m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144310 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 14/Sep/18 14:59 Start Date: 14/Sep/18 14:59 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r217741210 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -280,28 +316,77 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +@Nullable transient Multimap bundles = null; +@Nullable AtomicInteger batchCount = null; +private final int batchSize; + +@StartBundle +public void startBundle(StartBundleContext context) { + bundles = ArrayListMultimap.create(); + batchCount = new AtomicInteger(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bundles.put(window, key); + if (batchCount.incrementAndGet() > getBatchSize()) { +Multimap> kvs = fetchAndFlush(); Review comment: why you need to deal with windows here ? (note I quickly looked but didn't get the intuition), if we can avoid this probably is better, no? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144310) Time Spent: 5h 20m (was: 5h 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144311=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144311 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 14/Sep/18 14:59 Start Date: 14/Sep/18 14:59 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r217741498 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -280,28 +316,77 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +@Nullable transient Multimap bundles = null; +@Nullable AtomicInteger batchCount = null; +private final int batchSize; + +@StartBundle +public void startBundle(StartBundleContext context) { + bundles = ArrayListMultimap.create(); + batchCount = new AtomicInteger(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bundles.put(window, key); + if (batchCount.incrementAndGet() > getBatchSize()) { +Multimap> kvs = fetchAndFlush(); +for (BoundedWindow w : kvs.keySet()) { + for (KV kv : kvs.get(w)) { +processContext.output(kv); + } +} + } +} + +private Multimap> fetchAndFlush() { + Multimap> kvs = ArrayListMultimap.create(); + for (BoundedWindow w : bundles.keySet()) { +String[] keys = new String[bundles.get(w).size()]; +bundles.get(w).toArray(keys); +List results = jedis.mget(keys); +for (int i = 0; i < results.size(); i++) { + if (results.get(i) != null) { +kvs.put(w, KV.of(keys[i], results.get(i))); + } +} + } + bundles = ArrayListMultimap.create(); + batchCount.set(0); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + Multimap> kvs = fetchAndFlush(); Review comment: Is this extra flush needed?, without an equivalent `startBundle` I don't see why this could be needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144311) Time Spent: 5h 20m (was: 5h 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: >
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144048=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144048 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 13/Sep/18 18:36 Start Date: 13/Sep/18 18:36 Worklog Time Spent: 10m Work Description: jbonofre commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-421108852 @huygaa11 sorry, I forgot. Resuming my review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144048) Time Spent: 5h 10m (was: 5h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144017=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144017 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 13/Sep/18 17:26 Start Date: 13/Sep/18 17:26 Worklog Time Spent: 10m Work Description: huygaa11 commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-421086985 Friendly ping for review! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144017) Time Spent: 5h (was: 4h 50m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=140560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140560 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 03/Sep/18 12:58 Start Date: 03/Sep/18 12:58 Worklog Time Spent: 10m Work Description: vvarma commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-418108115 Hi @iemejia , I have made the change requested. I used the window as you suggested. Please let me know if they are as expected. Apologies for the delay. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 140560) Time Spent: 4h 50m (was: 4h 40m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=140515=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140515 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 03/Sep/18 09:26 Start Date: 03/Sep/18 09:26 Worklog Time Spent: 10m Work Description: iemejia commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-418056484 Just pinging about the status on this one @vvarma we are quite close, so hopefully you can fix the last bits so we can merge it. Sorry if this has taken too long. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 140515) Time Spent: 4h 40m (was: 4.5h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=136128=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136128 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 20/Aug/18 12:19 Start Date: 20/Aug/18 12:19 Worklog Time Spent: 10m Work Description: iemejia commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-414297169 Hi @vvarma it seems the changes on the other PR produced a conflict. Can you please rebase so we can merge this one (+ add the minor fixes of the review). Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 136128) Time Spent: 4.5h (was: 4h 20m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127892 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 26/Jul/18 18:59 Start Date: 26/Jul/18 18:59 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r205567986 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +314,73 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +private int batchSize; +private List bufferedKeys; +BoundedWindow window; +Instant lastMsg; + +@StartBundle +public void startBundle(StartBundleContext context) { + bufferedKeys = new ArrayList<>(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bufferedKeys.add(key); + this.window = window; + this.lastMsg = processContext.timestamp(); + if (bufferedKeys.size() > getBatchSize()) { +List> kvs = fetchAndFlush(); +for (KV kv : kvs) { + processContext.output(kv); +} + } +} + +private List> fetchAndFlush() { + String[] keys = new String[bufferedKeys.size()]; + bufferedKeys.toArray(keys); + List results = jedis.mget(keys); + assert bufferedKeys.size() == results.size(); + List> kvs = new ArrayList<>(bufferedKeys.size()); + for (int i = 0; i < bufferedKeys.size(); i++) { +if (results.get(i) != null) { + kvs.add(KV.of(bufferedKeys.get(i), results.get(i))); +} + } + bufferedKeys = new ArrayList<>(); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + List> kvs = fetchAndFlush(); + for (KV kv : kvs) { +context.output(kv, lastMsg, window); Review comment: @iemejia could you please advice on the above ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127892) Time Spent: 4h 20m (was: 4h 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127822=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127822 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 26/Jul/18 16:03 Start Date: 26/Jul/18 16:03 Worklog Time Spent: 10m Work Description: jbonofre commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-408148372 I gonna do the review as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127822) Time Spent: 4h 10m (was: 4h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127760=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127760 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 26/Jul/18 13:57 Start Date: 26/Jul/18 13:57 Worklog Time Spent: 10m Work Description: iemejia commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-408106447 No problem, thanks a lot for taking care of thism we are really close. I just wanted to bring awareness of another PR on Redis #6045 conceptually there seems not to be a conflict but good to think on any impact. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127760) Time Spent: 4h (was: 3h 50m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127715 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 26/Jul/18 09:38 Start Date: 26/Jul/18 09:38 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r205392609 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +314,73 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +private int batchSize; +private List bufferedKeys; +BoundedWindow window; +Instant lastMsg; + +@StartBundle +public void startBundle(StartBundleContext context) { + bufferedKeys = new ArrayList<>(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bufferedKeys.add(key); + this.window = window; + this.lastMsg = processContext.timestamp(); + if (bufferedKeys.size() > getBatchSize()) { +List> kvs = fetchAndFlush(); +for (KV kv : kvs) { + processContext.output(kv); +} + } +} + +private List> fetchAndFlush() { + String[] keys = new String[bufferedKeys.size()]; + bufferedKeys.toArray(keys); + List results = jedis.mget(keys); + assert bufferedKeys.size() == results.size(); + List> kvs = new ArrayList<>(bufferedKeys.size()); + for (int i = 0; i < bufferedKeys.size(); i++) { +if (results.get(i) != null) { + kvs.add(KV.of(bufferedKeys.get(i), results.get(i))); +} + } + bufferedKeys = new ArrayList<>(); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + List> kvs = fetchAndFlush(); + for (KV kv : kvs) { +context.output(kv, lastMsg, window); Review comment: Though here, messages with the same window are being bundled together, stored and finally processed in the finishBundle. In the case of ReadFn, the idea was to buffer requests till the batch size and process them at that time. Hence the output is pushed in both processElement method as well as finishBundle. Not sure if I understand how to use a map with window as the key. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127715) Time Spent: 3h 50m (was: 3h 40m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. >
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127703=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127703 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 26/Jul/18 09:13 Start Date: 26/Jul/18 09:13 Worklog Time Spent: 10m Work Description: vvarma commented on issue #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#issuecomment-408032339 Sure will make the change. Sorry about the delay. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 127703) Time Spent: 3h 40m (was: 3.5h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=125251=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125251 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 19/Jul/18 21:25 Start Date: 19/Jul/18 21:25 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5841: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/5841#discussion_r203876000 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +314,73 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +private int batchSize; +private List bufferedKeys; +BoundedWindow window; +Instant lastMsg; + +@StartBundle +public void startBundle(StartBundleContext context) { + bufferedKeys = new ArrayList<>(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bufferedKeys.add(key); + this.window = window; + this.lastMsg = processContext.timestamp(); + if (bufferedKeys.size() > getBatchSize()) { +List> kvs = fetchAndFlush(); +for (KV kv : kvs) { + processContext.output(kv); +} + } +} + +private List> fetchAndFlush() { + String[] keys = new String[bufferedKeys.size()]; + bufferedKeys.toArray(keys); + List results = jedis.mget(keys); + assert bufferedKeys.size() == results.size(); + List> kvs = new ArrayList<>(bufferedKeys.size()); + for (int i = 0; i < bufferedKeys.size(); i++) { +if (results.get(i) != null) { + kvs.add(KV.of(bufferedKeys.get(i), results.get(i))); +} + } + bufferedKeys = new ArrayList<>(); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + List> kvs = fetchAndFlush(); + for (KV kv : kvs) { +context.output(kv, lastMsg, window); Review comment: Oh so silly of me I have misread the motivation on keeping the window, you are right, it makes total sense, in that case probably it is a better idea to store the elements in a Map with the window as key and the list of elements and use the window.maxTimeStamp (you don't need the lastMsg) and flush when enough elements, Similar to what is done here (but with the count logic): https://github.com/apache/beam/blob/70b653187d566da7eea2590f17a36bbf22ef8bed/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L825-L844 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 125251) Time Spent: 3.5h (was: 3h 20m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the >
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=123342=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-123342 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 14/Jul/18 17:55 Start Date: 14/Jul/18 17:55 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841#discussion_r202521323 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +314,73 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { +private int batchSize; +private List bufferedKeys; +BoundedWindow window; +Instant lastMsg; + +@StartBundle +public void startBundle(StartBundleContext context) { + bufferedKeys = new ArrayList<>(); +} -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration, int batchSize) { + super(connectionConfiguration); + this.batchSize = batchSize; +} + +private int getBatchSize() { + return batchSize; +} + +@ProcessElement +public void processElement(ProcessContext processContext, BoundedWindow window) +throws Exception { + String key = processContext.element(); + bufferedKeys.add(key); + this.window = window; + this.lastMsg = processContext.timestamp(); + if (bufferedKeys.size() > getBatchSize()) { +List> kvs = fetchAndFlush(); +for (KV kv : kvs) { + processContext.output(kv); +} + } +} + +private List> fetchAndFlush() { + String[] keys = new String[bufferedKeys.size()]; + bufferedKeys.toArray(keys); + List results = jedis.mget(keys); + assert bufferedKeys.size() == results.size(); + List> kvs = new ArrayList<>(bufferedKeys.size()); + for (int i = 0; i < bufferedKeys.size(); i++) { +if (results.get(i) != null) { + kvs.add(KV.of(bufferedKeys.get(i), results.get(i))); +} + } + bufferedKeys = new ArrayList<>(); + return kvs; +} + +@FinishBundle +public void finishBundle(FinishBundleContext context) throws Exception { + List> kvs = fetchAndFlush(); + for (KV kv : kvs) { +context.output(kv, lastMsg, window); Review comment: @iemejia Not sure about this since I am using the Instant and window from the last processed message to produce output in the finish bundle method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 123342) Time Spent: 3h 20m (was: 3h 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: >
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=123051=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-123051 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 13/Jul/18 21:21 Start Date: 13/Jul/18 21:21 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841#discussion_r202475941 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +290,31 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration) { + super(connectionConfiguration); +} + +@ProcessElement +public void processElement(ProcessContext processContext) throws Exception { + String key = processContext.element(); + + String value = jedis.get(key); Review comment: Hi, sorry I have missed your message. The idea is that we should add the DoFn startBundle and finishBundle methods and create a method in the Read to define the size of the maximum amount of elements that we will request, then you will build the collection of the keys that are going to be requested in the processElement, but you won't do the request in the processElement but in the finishBundle method by doing a MGET request with the defined number of elements of the batch, we should choose a default min size e.g. 1000. It is similar to what other IOs do in the Write (see withBatchSize in ElasticsearchIO or SolrIO, for ref. https://github.com/apache/beam/blob/c14c975224af417dcdc74fed8b0d893be742e9d7/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L805-L829 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 123051) Time Spent: 3h 10m (was: 3h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=120839=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120839 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 09/Jul/18 16:13 Start Date: 09/Jul/18 16:13 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841#discussion_r201061427 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +290,31 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration) { + super(connectionConfiguration); +} + +@ProcessElement +public void processElement(ProcessContext processContext) throws Exception { + String key = processContext.element(); + + String value = jedis.get(key); Review comment: @iemejia right now there are two operators exposed for read operations 1. ReadKeysWithPattern - which is 2 stepped - a ) for each pattern prefix in the PCollection fetch all matching keys b) fetch values for each key 2. ReadFn - which gets values for each key in the PCollection. I can make changes to ReadKeysWithPattern to use a parameterized batch to fetch the data in the b step, but for ReadFn I am not sure of how to use the batch parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 120839) Time Spent: 3h (was: 2h 50m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118615 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 03/Jul/18 08:24 Start Date: 03/Jul/18 08:24 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841#discussion_r199722160 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +290,31 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration) { + super(connectionConfiguration); +} + +@ProcessElement +public void processElement(ProcessContext processContext) throws Exception { + String key = processContext.element(); + + String value = jedis.get(key); Review comment: :+1: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118615) Time Spent: 2h 50m (was: 2h 40m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118216=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118216 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 02/Jul/18 16:03 Start Date: 02/Jul/18 16:03 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841#discussion_r199546193 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +290,31 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration) { + super(connectionConfiguration); +} + +@ProcessElement +public void processElement(ProcessContext processContext) throws Exception { + String key = processContext.element(); + + String value = jedis.get(key); Review comment: Sure, I think it should be straightforward, will update the pr over the week. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118216) Time Spent: 2h 40m (was: 2.5h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118164=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118164 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 02/Jul/18 13:12 Start Date: 02/Jul/18 13:12 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841#discussion_r199491469 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -279,28 +290,31 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if ("0".equals(cursor)) { finished = true; } } } + } + /** A {@link DoFn} requesting Redis server to get key/value pairs. */ + private static class ReadFn extends BaseReadFn> { -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration) { + super(connectionConfiguration); +} + +@ProcessElement +public void processElement(ProcessContext processContext) throws Exception { + String key = processContext.element(); + + String value = jedis.get(key); Review comment: As mentioned in the previous PR I am a bit concerned about losing the multiple data request capability, any chance you can work on this with the approach you mentioned based on `MGET` for ReadFn. The simplest approach probably is to do like other IOs and have a default size that can be parametrized via a `withBatchSize` method. WDYT ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118164) Time Spent: 2.5h (was: 2h 20m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118096=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118096 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 02/Jul/18 07:36 Start Date: 02/Jul/18 07:36 Worklog Time Spent: 10m Work Description: iemejia commented on issue #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841#issuecomment-401697979 Run Java Precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 118096) Time Spent: 2h 20m (was: 2h 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117791=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117791 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 30/Jun/18 07:44 Start Date: 30/Jun/18 07:44 Worklog Time Spent: 10m Work Description: vvarma commented on issue #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#issuecomment-401524603 Closing this pr, opened a new one with fixes and rebased. https://github.com/apache/beam/pull/5841 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117791) Time Spent: 2h (was: 1h 50m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117792=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117792 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 30/Jun/18 07:44 Start Date: 30/Jun/18 07:44 Worklog Time Spent: 10m Work Description: vvarma closed pull request #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java index 2559d0773cc..f58200a0f36 100644 --- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java +++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java @@ -200,6 +200,7 @@ public void populateDisplayData(DisplayData.Builder builder) { return input .apply(Create.of(keyPattern())) + .apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration( .apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration())); } @@ -260,16 +261,12 @@ public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connecti } - /** - * A {@link DoFn} requesting Redis server to get key/value pairs. - */ - private static class ReadFn extends DoFn> { - -private final RedisConnectionConfiguration connectionConfiguration; + private abstract static class BaseReadFn extends DoFn { +protected final RedisConnectionConfiguration connectionConfiguration; -private transient Jedis jedis; +protected transient Jedis jedis; -public ReadFn(RedisConnectionConfiguration connectionConfiguration) { +public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) { this.connectionConfiguration = connectionConfiguration; } @@ -278,6 +275,18 @@ public void setup() { jedis = connectionConfiguration.connect(); } +@Teardown +public void teardown() { + jedis.close(); +} + } + + private static class ReadKeywsWithPattern extends BaseReadFn { + +ReadKeywsWithPattern(RedisConnectionConfiguration connectionConfiguration) { + super(connectionConfiguration); +} + @ProcessElement public void processElement(ProcessContext processContext) throws Exception { ScanParams scanParams = new ScanParams(); @@ -288,28 +297,33 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); -if (keys != null) { - for (String key : keys) { -pipeline.get(key); - } - List values = pipeline.syncAndReturnAll(); - for (int i = 0; i < values.size(); i++) { -processContext.output(KV.of(keys.get(i), (String) values.get(i))); - } +for (String k : keys) { + processContext.output(k); } - cursor = scanResult.getStringCursor(); if (cursor.equals("0")) { finished = true; } } } + } + /** + * A {@link DoFn} requesting Redis server to get key/value pairs. + */ + private static class ReadFn extends BaseReadFn> { -@Teardown -public void teardown() { - jedis.close(); +ReadFn(RedisConnectionConfiguration connectionConfiguration) { + super(connectionConfiguration); +} + +@ProcessElement +public void processElement(ProcessContext processContext) throws Exception { + String key = processContext.element(); + + String value = jedis.get(key); + if (value != null) { +processContext.output(KV.of(key, value)); + } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117792) Time Spent: 2h 10m (was: 2h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter:
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117786=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117786 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 30/Jun/18 07:40 Start Date: 30/Jun/18 07:40 Worklog Time Spent: 10m Work Description: vvarma opened a new pull request #5841: Fixes https://issues.apache.org/jira/browse/BEAM-3446. URL: https://github.com/apache/beam/pull/5841 Rebase of https://github.com/apache/beam/pull/4656 BaseReadFn to abstract general jedis operations. Moved key fetch given prefix to ReadKeywsWithPattern DoFn. ReadFn is pure fetch from redis given key. URL: https://issues.apache.org/jira/browse/BEAM-3446 @iemejia @jbonofre It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117786) Time Spent: 1h 50m (was: 1h 40m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117778=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117778 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 30/Jun/18 06:54 Start Date: 30/Jun/18 06:54 Worklog Time Spent: 10m Work Description: vvarma commented on a change in pull request #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#discussion_r199315229 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -288,28 +297,33 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); Review comment: The number of keys that need to be looked up in a given window or batch can vary. Ideally, we should have a configurable batch size and use MGET `https://redis.io/commands/mget` if wanted to optimize further. Pipelining an entire window or batch can cause memory spikes in Redis depending on the number of keys being looked up, for the time being, to simplify things I removed pipeline. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117778) Time Spent: 1h 40m (was: 1.5h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117394=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117394 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 29/Jun/18 14:22 Start Date: 29/Jun/18 14:22 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#discussion_r199170256 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -260,16 +261,12 @@ public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connecti } - /** - * A {@link DoFn} requesting Redis server to get key/value pairs. - */ - private static class ReadFn extends DoFn> { - -private final RedisConnectionConfiguration connectionConfiguration; + private abstract static class BaseReadFn extends DoFn { +protected final RedisConnectionConfiguration connectionConfiguration; Review comment: can be package private This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117394) Time Spent: 1h 10m (was: 1h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117392 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 29/Jun/18 14:22 Start Date: 29/Jun/18 14:22 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#discussion_r199146936 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -200,6 +200,7 @@ public void populateDisplayData(DisplayData.Builder builder) { return input .apply(Create.of(keyPattern())) + .apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration( Review comment: s/ReadKeywsWithPattern/ReadKeysWithPattern This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117392) Time Spent: 50m (was: 40m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117396 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 29/Jun/18 14:22 Start Date: 29/Jun/18 14:22 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#discussion_r199170282 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -260,16 +261,12 @@ public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connecti } - /** - * A {@link DoFn} requesting Redis server to get key/value pairs. - */ - private static class ReadFn extends DoFn> { - -private final RedisConnectionConfiguration connectionConfiguration; + private abstract static class BaseReadFn extends DoFn { +protected final RedisConnectionConfiguration connectionConfiguration; -private transient Jedis jedis; +protected transient Jedis jedis; Review comment: can be package private This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117396) Time Spent: 1.5h (was: 1h 20m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117395=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117395 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 29/Jun/18 14:22 Start Date: 29/Jun/18 14:22 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#discussion_r199170873 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -288,28 +297,33 @@ public void processElement(ProcessContext processContext) throws Exception { while (!finished) { ScanResult scanResult = jedis.scan(cursor, scanParams); List keys = scanResult.getResult(); - -Pipeline pipeline = jedis.pipelined(); Review comment: Question: What is the reason to remove pipelining in general, seems like if the approach of this PR is more composable, it would perform worse, won't it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117395) Time Spent: 1h 20m (was: 1h 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117393=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117393 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 29/Jun/18 14:22 Start Date: 29/Jun/18 14:22 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#discussion_r199170602 ## File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java ## @@ -260,16 +261,12 @@ public ReadAll withConnectionConfiguration(RedisConnectionConfiguration connecti } - /** - * A {@link DoFn} requesting Redis server to get key/value pairs. - */ - private static class ReadFn extends DoFn> { - -private final RedisConnectionConfiguration connectionConfiguration; + private abstract static class BaseReadFn extends DoFn { +protected final RedisConnectionConfiguration connectionConfiguration; -private transient Jedis jedis; +protected transient Jedis jedis; -public ReadFn(RedisConnectionConfiguration connectionConfiguration) { +public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) { Review comment: remove public, in general it is a common Beam practice to restrict access as much as possible. You can use IntelliJ's analyze code to do this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 117393) Time Spent: 1h (was: 50m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Vinay varma >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=116882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116882 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 28/Jun/18 16:26 Start Date: 28/Jun/18 16:26 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#issuecomment-401093881 We have turned on autoformatting of the codebase, which causes small conflicts across the board. You can probably safely rebase and just keep your changes. Like this: ``` $ git rebase ... see some conflicts $ git diff ... confirmed that the conflicts are just autoformatting ... so we can just keep our changes are do our own autoformat $ git checkout --theirs -- $ git add -u $ git rebase --continue $ ./gradlew spotlessJavaApply ``` Please ping me if you run into any difficulty. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 116882) Time Spent: 40m (was: 0.5h) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Jean-Baptiste Onofré >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=110086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110086 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 08/Jun/18 12:04 Start Date: 08/Jun/18 12:04 Worklog Time Spent: 10m Work Description: vvarma commented on issue #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#issuecomment-395740529 the fix is in place @jbonofre please help review This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 110086) Time Spent: 0.5h (was: 20m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Jean-Baptiste Onofré >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations
[ https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=109620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109620 ] ASF GitHub Bot logged work on BEAM-3446: Author: ASF GitHub Bot Created on: 07/Jun/18 04:01 Start Date: 07/Jun/18 04:01 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #4656: [BEAM-3446] Fixes RedisIO non-prefix read operations URL: https://github.com/apache/beam/pull/4656#issuecomment-395285681 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 109620) Time Spent: 20m (was: 10m) > RedisIO non-prefix read operations > -- > > Key: BEAM-3446 > URL: https://issues.apache.org/jira/browse/BEAM-3446 > Project: Beam > Issue Type: New Feature > Components: io-java-redis >Reporter: Vinay varma >Assignee: Jean-Baptiste Onofré >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Read operation in RedisIO is for prefix based look ups. While this can be > used for exact key matches as well, the number of operations limits the > through put of the function. > I suggest exposing current readAll operation as readbyprefix and using more > simpler operations for readAll functionality. > ex: > {code:java} > String output = jedis.get(element); > if (output != null) { > processContext.output(KV.of(element, output)); > } > {code} > instead of: > https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292 -- This message was sent by Atlassian JIRA (v7.6.3#76005)