[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145696=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145696
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 19/Sep/18 15:42
Start Date: 19/Sep/18 15:42
Worklog Time Spent: 10m 
  Work Description: vvarma commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-422852677
 
 
   @iemejia Thank you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 145696)
Time Spent: 6h 20m  (was: 6h 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
> Fix For: 2.8.0
>
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145693
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 19/Sep/18 15:14
Start Date: 19/Sep/18 15:14
Worklog Time Spent: 10m 
  Work Description: iemejia closed pull request #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java 
b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 279ca46db2b..57d0b77af1f 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -20,7 +20,10 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -34,6 +37,7 @@
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -109,6 +113,7 @@ public static Read read() {
 return new AutoValue_RedisIO_Read.Builder()
 .setConnectionConfiguration(RedisConnectionConfiguration.create())
 .setKeyPattern("*")
+.setBatchSize(1000)
 .build();
   }
 
@@ -119,6 +124,7 @@ public static Read read() {
   public static ReadAll readAll() {
 return new AutoValue_RedisIO_ReadAll.Builder()
 .setConnectionConfiguration(RedisConnectionConfiguration.create())
+.setBatchSize(1000)
 .build();
   }
 
@@ -142,6 +148,8 @@ private RedisIO() {}
 @Nullable
 abstract String keyPattern();
 
+abstract int batchSize();
+
 abstract Builder builder();
 
 @AutoValue.Builder
@@ -152,6 +160,8 @@ private RedisIO() {}
   @Nullable
   abstract Builder setKeyPattern(String keyPattern);
 
+  abstract Builder setBatchSize(int batchSize);
+
   abstract Read build();
 }
 
@@ -185,6 +195,10 @@ public Read 
withConnectionConfiguration(RedisConnectionConfiguration connection)
   return builder().setConnectionConfiguration(connection).build();
 }
 
+public Read withBatchSize(int batchSize) {
+  return builder().setBatchSize(batchSize).build();
+}
+
 @Override
 public void populateDisplayData(DisplayData.Builder builder) {
   connectionConfiguration().populateDisplayData(builder);
@@ -196,7 +210,11 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   return input
   .apply(Create.of(keyPattern()))
-  
.apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
+  .apply(ParDo.of(new ReadKeysWithPattern(connectionConfiguration(
+  .apply(
+  RedisIO.readAll()
+  .withConnectionConfiguration(connectionConfiguration())
+  .withBatchSize(batchSize()));
 }
   }
 
@@ -208,6 +226,8 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 @Nullable
 abstract RedisConnectionConfiguration connectionConfiguration();
 
+abstract int batchSize();
+
 abstract ReadAll.Builder builder();
 
 @AutoValue.Builder
@@ -215,6 +235,8 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
   @Nullable
   abstract ReadAll.Builder 
setConnectionConfiguration(RedisConnectionConfiguration connection);
 
+  abstract ReadAll.Builder setBatchSize(int batchSize);
+
   abstract ReadAll build();
 }
 
@@ -243,25 +265,27 @@ public ReadAll 
withConnectionConfiguration(RedisConnectionConfiguration connecti
   return builder().setConnectionConfiguration(connection).build();
 }
 
+public ReadAll withBatchSize(int batchSize) {
+  return builder().setBatchSize(batchSize).build();
+}
+
 @Override
 public PCollection> expand(PCollection input) {
   checkArgument(connectionConfiguration() != null, 
"withConnectionConfiguration() is required");
 
   return input
-  

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145667=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145667
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 19/Sep/18 14:08
Start Date: 19/Sep/18 14:08
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r218818530
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -280,28 +316,77 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+@Nullable transient Multimap bundles = null;
+@Nullable AtomicInteger batchCount = null;
+private final int batchSize;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bundles = ArrayListMultimap.create();
+  batchCount = new AtomicInteger();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bundles.put(window, key);
+  if (batchCount.incrementAndGet() > getBatchSize()) {
+Multimap> kvs = fetchAndFlush();
+for (BoundedWindow w : kvs.keySet()) {
+  for (KV kv : kvs.get(w)) {
+processContext.output(kv);
+  }
+}
+  }
+}
+
+private Multimap> fetchAndFlush() {
+  Multimap> kvs = 
ArrayListMultimap.create();
+  for (BoundedWindow w : bundles.keySet()) {
+String[] keys = new String[bundles.get(w).size()];
+bundles.get(w).toArray(keys);
+List results = jedis.mget(keys);
+for (int i = 0; i < results.size(); i++) {
+  if (results.get(i) != null) {
+kvs.put(w, KV.of(keys[i], results.get(i)));
+  }
+}
+  }
+  bundles = ArrayListMultimap.create();
+  batchCount.set(0);
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  Multimap> kvs = fetchAndFlush();
 
 Review comment:
   Thanks @iemejia . Do suggest if there any other changes needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 145667)
Time Spent: 6h  (was: 5h 50m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=145638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145638
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 19/Sep/18 12:11
Start Date: 19/Sep/18 12:11
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r218777618
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -280,28 +316,77 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+@Nullable transient Multimap bundles = null;
+@Nullable AtomicInteger batchCount = null;
+private final int batchSize;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bundles = ArrayListMultimap.create();
+  batchCount = new AtomicInteger();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bundles.put(window, key);
+  if (batchCount.incrementAndGet() > getBatchSize()) {
+Multimap> kvs = fetchAndFlush();
+for (BoundedWindow w : kvs.keySet()) {
+  for (KV kv : kvs.get(w)) {
+processContext.output(kv);
+  }
+}
+  }
+}
+
+private Multimap> fetchAndFlush() {
+  Multimap> kvs = 
ArrayListMultimap.create();
+  for (BoundedWindow w : bundles.keySet()) {
+String[] keys = new String[bundles.get(w).size()];
+bundles.get(w).toArray(keys);
+List results = jedis.mget(keys);
+for (int i = 0; i < results.size(); i++) {
+  if (results.get(i) != null) {
+kvs.put(w, KV.of(keys[i], results.get(i)));
+  }
+}
+  }
+  bundles = ArrayListMultimap.create();
+  batchCount.set(0);
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  Multimap> kvs = fetchAndFlush();
 
 Review comment:
   Thanks for answering, I have somehow misread the `startBundle` as a setup 
only method. I see how everything fits now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 145638)
Time Spent: 5h 50m  (was: 5h 40m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144321=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144321
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 14/Sep/18 15:30
Start Date: 14/Sep/18 15:30
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r217752660
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -280,28 +316,77 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+@Nullable transient Multimap bundles = null;
+@Nullable AtomicInteger batchCount = null;
+private final int batchSize;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bundles = ArrayListMultimap.create();
+  batchCount = new AtomicInteger();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bundles.put(window, key);
+  if (batchCount.incrementAndGet() > getBatchSize()) {
+Multimap> kvs = fetchAndFlush();
+for (BoundedWindow w : kvs.keySet()) {
+  for (KV kv : kvs.get(w)) {
+processContext.output(kv);
+  }
+}
+  }
+}
+
+private Multimap> fetchAndFlush() {
+  Multimap> kvs = 
ArrayListMultimap.create();
+  for (BoundedWindow w : bundles.keySet()) {
+String[] keys = new String[bundles.get(w).size()];
+bundles.get(w).toArray(keys);
+List results = jedis.mget(keys);
+for (int i = 0; i < results.size(); i++) {
+  if (results.get(i) != null) {
+kvs.put(w, KV.of(keys[i], results.get(i)));
+  }
+}
+  }
+  bundles = ArrayListMultimap.create();
+  batchCount.set(0);
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  Multimap> kvs = fetchAndFlush();
 
 Review comment:
   the reason for this extra flush is because we have a batch size. Once the 
number of messages reaches this value, we invoke flush. 
   at the end of the window when finishbundle is invoked, there may be few 
messages left in the buffer (less than batch size). So we invoke flush from 
here as well. And for this reason we need to store the window of the message as 
well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144321)
Time Spent: 5h 40m  (was: 5.5h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144320=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144320
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 14/Sep/18 15:27
Start Date: 14/Sep/18 15:27
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r217751824
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -280,28 +316,77 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+@Nullable transient Multimap bundles = null;
+@Nullable AtomicInteger batchCount = null;
+private final int batchSize;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bundles = ArrayListMultimap.create();
+  batchCount = new AtomicInteger();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bundles.put(window, key);
+  if (batchCount.incrementAndGet() > getBatchSize()) {
+Multimap> kvs = fetchAndFlush();
 
 Review comment:
   the window stored in the key here is used in FinishBundle to output the 
keys, since the the context in FinishBundle takes window as a parameter. 
`context.output(kv, w.maxTimestamp(), w);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144320)
Time Spent: 5.5h  (was: 5h 20m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144310
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 14/Sep/18 14:59
Start Date: 14/Sep/18 14:59
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r217741210
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -280,28 +316,77 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+@Nullable transient Multimap bundles = null;
+@Nullable AtomicInteger batchCount = null;
+private final int batchSize;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bundles = ArrayListMultimap.create();
+  batchCount = new AtomicInteger();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bundles.put(window, key);
+  if (batchCount.incrementAndGet() > getBatchSize()) {
+Multimap> kvs = fetchAndFlush();
 
 Review comment:
   why you need to deal with windows here ? (note I quickly looked but didn't 
get the intuition), if we can avoid this probably is better, no?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144310)
Time Spent: 5h 20m  (was: 5h 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144311=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144311
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 14/Sep/18 14:59
Start Date: 14/Sep/18 14:59
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r217741498
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -280,28 +316,77 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+@Nullable transient Multimap bundles = null;
+@Nullable AtomicInteger batchCount = null;
+private final int batchSize;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bundles = ArrayListMultimap.create();
+  batchCount = new AtomicInteger();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bundles.put(window, key);
+  if (batchCount.incrementAndGet() > getBatchSize()) {
+Multimap> kvs = fetchAndFlush();
+for (BoundedWindow w : kvs.keySet()) {
+  for (KV kv : kvs.get(w)) {
+processContext.output(kv);
+  }
+}
+  }
+}
+
+private Multimap> fetchAndFlush() {
+  Multimap> kvs = 
ArrayListMultimap.create();
+  for (BoundedWindow w : bundles.keySet()) {
+String[] keys = new String[bundles.get(w).size()];
+bundles.get(w).toArray(keys);
+List results = jedis.mget(keys);
+for (int i = 0; i < results.size(); i++) {
+  if (results.get(i) != null) {
+kvs.put(w, KV.of(keys[i], results.get(i)));
+  }
+}
+  }
+  bundles = ArrayListMultimap.create();
+  batchCount.set(0);
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  Multimap> kvs = fetchAndFlush();
 
 Review comment:
   Is this extra flush needed?, without an equivalent `startBundle` I don't see 
why this could be needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144311)
Time Spent: 5h 20m  (was: 5h 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144048=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144048
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 13/Sep/18 18:36
Start Date: 13/Sep/18 18:36
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-421108852
 
 
   @huygaa11 sorry, I forgot. Resuming my review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144048)
Time Spent: 5h 10m  (was: 5h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=144017=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144017
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 13/Sep/18 17:26
Start Date: 13/Sep/18 17:26
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-421086985
 
 
   Friendly ping for review!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144017)
Time Spent: 5h  (was: 4h 50m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=140560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140560
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 03/Sep/18 12:58
Start Date: 03/Sep/18 12:58
Worklog Time Spent: 10m 
  Work Description: vvarma commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-418108115
 
 
   Hi @iemejia , I have made the change requested. I used the window as you 
suggested. Please let me know if they are as expected. Apologies for the delay.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140560)
Time Spent: 4h 50m  (was: 4h 40m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-09-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=140515=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140515
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 03/Sep/18 09:26
Start Date: 03/Sep/18 09:26
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-418056484
 
 
   Just pinging about the status on this one @vvarma we are quite close, so 
hopefully you can fix the last bits so we can merge it. Sorry if this has taken 
too long.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 140515)
Time Spent: 4h 40m  (was: 4.5h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-08-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=136128=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136128
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 20/Aug/18 12:19
Start Date: 20/Aug/18 12:19
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-414297169
 
 
   Hi @vvarma it seems the changes on the other PR produced a conflict. Can you 
please rebase so we can merge this one (+ add the minor fixes of the review). 
Thanks!
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 136128)
Time Spent: 4.5h  (was: 4h 20m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127892
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 26/Jul/18 18:59
Start Date: 26/Jul/18 18:59
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r205567986
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +314,73 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+private int batchSize;
+private List bufferedKeys;
+BoundedWindow window;
+Instant lastMsg;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bufferedKeys = new ArrayList<>();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bufferedKeys.add(key);
+  this.window = window;
+  this.lastMsg = processContext.timestamp();
+  if (bufferedKeys.size() > getBatchSize()) {
+List> kvs = fetchAndFlush();
+for (KV kv : kvs) {
+  processContext.output(kv);
+}
+  }
+}
+
+private List> fetchAndFlush() {
+  String[] keys = new String[bufferedKeys.size()];
+  bufferedKeys.toArray(keys);
+  List results = jedis.mget(keys);
+  assert bufferedKeys.size() == results.size();
+  List> kvs = new ArrayList<>(bufferedKeys.size());
+  for (int i = 0; i < bufferedKeys.size(); i++) {
+if (results.get(i) != null) {
+  kvs.add(KV.of(bufferedKeys.get(i), results.get(i)));
+}
+  }
+  bufferedKeys = new ArrayList<>();
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  List> kvs = fetchAndFlush();
+  for (KV kv : kvs) {
+context.output(kv, lastMsg, window);
 
 Review comment:
@iemejia could you please advice on the above ?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127892)
Time Spent: 4h 20m  (was: 4h 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127822=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127822
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 26/Jul/18 16:03
Start Date: 26/Jul/18 16:03
Worklog Time Spent: 10m 
  Work Description: jbonofre commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-408148372
 
 
   I gonna do the review as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127822)
Time Spent: 4h 10m  (was: 4h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127760=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127760
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 26/Jul/18 13:57
Start Date: 26/Jul/18 13:57
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-408106447
 
 
   No problem, thanks a lot for taking care of thism we are really close.
   I just wanted to bring awareness of another PR on Redis #6045 conceptually 
there seems not to be a conflict but good to think on any impact. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127760)
Time Spent: 4h  (was: 3h 50m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127715
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 26/Jul/18 09:38
Start Date: 26/Jul/18 09:38
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r205392609
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +314,73 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+private int batchSize;
+private List bufferedKeys;
+BoundedWindow window;
+Instant lastMsg;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bufferedKeys = new ArrayList<>();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bufferedKeys.add(key);
+  this.window = window;
+  this.lastMsg = processContext.timestamp();
+  if (bufferedKeys.size() > getBatchSize()) {
+List> kvs = fetchAndFlush();
+for (KV kv : kvs) {
+  processContext.output(kv);
+}
+  }
+}
+
+private List> fetchAndFlush() {
+  String[] keys = new String[bufferedKeys.size()];
+  bufferedKeys.toArray(keys);
+  List results = jedis.mget(keys);
+  assert bufferedKeys.size() == results.size();
+  List> kvs = new ArrayList<>(bufferedKeys.size());
+  for (int i = 0; i < bufferedKeys.size(); i++) {
+if (results.get(i) != null) {
+  kvs.add(KV.of(bufferedKeys.get(i), results.get(i)));
+}
+  }
+  bufferedKeys = new ArrayList<>();
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  List> kvs = fetchAndFlush();
+  for (KV kv : kvs) {
+context.output(kv, lastMsg, window);
 
 Review comment:
   Though here, messages with the same window are being bundled together, 
stored and finally processed in the finishBundle.
   In the case of ReadFn, the idea was to buffer requests till the batch size 
and process them at that time. Hence the output is pushed in both 
processElement method as well as finishBundle.
   Not sure if I understand how to use a map with window as the key. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127715)
Time Spent: 3h 50m  (was: 3h 40m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=127703=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127703
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 26/Jul/18 09:13
Start Date: 26/Jul/18 09:13
Worklog Time Spent: 10m 
  Work Description: vvarma commented on issue #5841: [BEAM-3446] Fixes 
RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#issuecomment-408032339
 
 
   Sure will make the change. Sorry about the delay.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 127703)
Time Spent: 3h 40m  (was: 3.5h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=125251=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125251
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 19/Jul/18 21:25
Start Date: 19/Jul/18 21:25
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #5841: 
[BEAM-3446] Fixes RedisIO non-prefix read operations
URL: https://github.com/apache/beam/pull/5841#discussion_r203876000
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +314,73 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+private int batchSize;
+private List bufferedKeys;
+BoundedWindow window;
+Instant lastMsg;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bufferedKeys = new ArrayList<>();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bufferedKeys.add(key);
+  this.window = window;
+  this.lastMsg = processContext.timestamp();
+  if (bufferedKeys.size() > getBatchSize()) {
+List> kvs = fetchAndFlush();
+for (KV kv : kvs) {
+  processContext.output(kv);
+}
+  }
+}
+
+private List> fetchAndFlush() {
+  String[] keys = new String[bufferedKeys.size()];
+  bufferedKeys.toArray(keys);
+  List results = jedis.mget(keys);
+  assert bufferedKeys.size() == results.size();
+  List> kvs = new ArrayList<>(bufferedKeys.size());
+  for (int i = 0; i < bufferedKeys.size(); i++) {
+if (results.get(i) != null) {
+  kvs.add(KV.of(bufferedKeys.get(i), results.get(i)));
+}
+  }
+  bufferedKeys = new ArrayList<>();
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  List> kvs = fetchAndFlush();
+  for (KV kv : kvs) {
+context.output(kv, lastMsg, window);
 
 Review comment:
   Oh so silly of me I have misread the motivation on keeping the window, you 
are right, it makes total sense, in that case probably it is a better idea to 
store the elements in a Map with the window as key and the list of elements and 
use the window.maxTimeStamp (you don't need the lastMsg) and flush when enough 
elements, Similar to what is done here (but with the count logic): 
   
https://github.com/apache/beam/blob/70b653187d566da7eea2590f17a36bbf22ef8bed/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L825-L844


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 125251)
Time Spent: 3.5h  (was: 3h 20m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=123342=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-123342
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 14/Jul/18 17:55
Start Date: 14/Jul/18 17:55
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
Fixes https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#discussion_r202521323
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +314,73 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
+private int batchSize;
+private List bufferedKeys;
+BoundedWindow window;
+Instant lastMsg;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  bufferedKeys = new ArrayList<>();
+}
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration, int 
batchSize) {
+  super(connectionConfiguration);
+  this.batchSize = batchSize;
+}
+
+private int getBatchSize() {
+  return batchSize;
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext, BoundedWindow 
window)
+throws Exception {
+  String key = processContext.element();
+  bufferedKeys.add(key);
+  this.window = window;
+  this.lastMsg = processContext.timestamp();
+  if (bufferedKeys.size() > getBatchSize()) {
+List> kvs = fetchAndFlush();
+for (KV kv : kvs) {
+  processContext.output(kv);
+}
+  }
+}
+
+private List> fetchAndFlush() {
+  String[] keys = new String[bufferedKeys.size()];
+  bufferedKeys.toArray(keys);
+  List results = jedis.mget(keys);
+  assert bufferedKeys.size() == results.size();
+  List> kvs = new ArrayList<>(bufferedKeys.size());
+  for (int i = 0; i < bufferedKeys.size(); i++) {
+if (results.get(i) != null) {
+  kvs.add(KV.of(bufferedKeys.get(i), results.get(i)));
+}
+  }
+  bufferedKeys = new ArrayList<>();
+  return kvs;
+}
+
+@FinishBundle
+public void finishBundle(FinishBundleContext context) throws Exception {
+  List> kvs = fetchAndFlush();
+  for (KV kv : kvs) {
+context.output(kv, lastMsg, window);
 
 Review comment:
   @iemejia Not sure about this since I am using the Instant and window from 
the last processed message to produce output in the finish bundle method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 123342)
Time Spent: 3h 20m  (was: 3h 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=123051=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-123051
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 13/Jul/18 21:21
Start Date: 13/Jul/18 21:21
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #5841: 
Fixes https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#discussion_r202475941
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +290,31 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+  super(connectionConfiguration);
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext) throws Exception 
{
+  String key = processContext.element();
+
+  String value = jedis.get(key);
 
 Review comment:
   Hi, sorry I have missed your message. The idea is that we should add the 
DoFn startBundle and finishBundle methods and create a method in the Read to 
define the size of the maximum amount of elements that we will request, then 
you will build the collection of the keys that are going to be requested in the 
processElement, but you  won't do the request in the processElement but in the 
finishBundle method by doing a MGET request with the defined number of elements 
of the batch, we should choose a default min size e.g. 1000. It is similar to 
what other IOs do in the Write (see withBatchSize in ElasticsearchIO or SolrIO, 
for ref.
   
https://github.com/apache/beam/blob/c14c975224af417dcdc74fed8b0d893be742e9d7/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L805-L829


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 123051)
Time Spent: 3h 10m  (was: 3h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=120839=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120839
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 09/Jul/18 16:13
Start Date: 09/Jul/18 16:13
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
Fixes https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#discussion_r201061427
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +290,31 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+  super(connectionConfiguration);
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext) throws Exception 
{
+  String key = processContext.element();
+
+  String value = jedis.get(key);
 
 Review comment:
   @iemejia right now there are two operators exposed for read operations 
   1. ReadKeysWithPattern - which is 2 stepped - a ) for each pattern prefix in 
the PCollection fetch all matching keys b) fetch values for each key 
   2. ReadFn - which gets values for each key in the PCollection.
   
   I can make changes to ReadKeysWithPattern to use a parameterized batch to 
fetch the data in the b step, but for ReadFn I am not sure of how to use the 
batch parameter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 120839)
Time Spent: 3h  (was: 2h 50m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-03 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118615=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118615
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 03/Jul/18 08:24
Start Date: 03/Jul/18 08:24
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #5841: 
Fixes https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#discussion_r199722160
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +290,31 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+  super(connectionConfiguration);
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext) throws Exception 
{
+  String key = processContext.element();
+
+  String value = jedis.get(key);
 
 Review comment:
   :+1:


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118615)
Time Spent: 2h 50m  (was: 2h 40m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118216=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118216
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 02/Jul/18 16:03
Start Date: 02/Jul/18 16:03
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #5841: 
Fixes https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#discussion_r199546193
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +290,31 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+  super(connectionConfiguration);
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext) throws Exception 
{
+  String key = processContext.element();
+
+  String value = jedis.get(key);
 
 Review comment:
   Sure, I think it should be straightforward, will update the pr over the week.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118216)
Time Spent: 2h 40m  (was: 2.5h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118164=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118164
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 02/Jul/18 13:12
Start Date: 02/Jul/18 13:12
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #5841: 
Fixes https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#discussion_r199491469
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -279,28 +290,31 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if ("0".equals(cursor)) {
   finished = true;
 }
   }
 }
+  }
+  /** A {@link DoFn} requesting Redis server to get key/value pairs. */
+  private static class ReadFn extends BaseReadFn> {
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+  super(connectionConfiguration);
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext) throws Exception 
{
+  String key = processContext.element();
+
+  String value = jedis.get(key);
 
 Review comment:
   As mentioned in the previous PR I am a bit concerned about losing the 
multiple data request capability, any chance you can work on this with the 
approach you mentioned based on `MGET` for ReadFn.
   The simplest approach probably is to do like other IOs and have a default 
size that can be parametrized via a `withBatchSize` method. WDYT ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118164)
Time Spent: 2.5h  (was: 2h 20m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-07-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=118096=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118096
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 02/Jul/18 07:36
Start Date: 02/Jul/18 07:36
Worklog Time Spent: 10m 
  Work Description: iemejia commented on issue #5841: Fixes 
https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841#issuecomment-401697979
 
 
   Run Java Precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 118096)
Time Spent: 2h 20m  (was: 2h 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117791=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117791
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 30/Jun/18 07:44
Start Date: 30/Jun/18 07:44
Worklog Time Spent: 10m 
  Work Description: vvarma commented on issue #4656: [BEAM-3446] Fixes 
RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#issuecomment-401524603
 
 
   Closing this pr, opened a new one with fixes and rebased. 
https://github.com/apache/beam/pull/5841


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117791)
Time Spent: 2h  (was: 1h 50m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117792=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117792
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 30/Jun/18 07:44
Start Date: 30/Jun/18 07:44
Worklog Time Spent: 10m 
  Work Description: vvarma closed pull request #4656: [BEAM-3446] Fixes 
RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java 
b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 2559d0773cc..f58200a0f36 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -200,6 +200,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   return input
   .apply(Create.of(keyPattern()))
+  .apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration(
   
.apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
 }
 
@@ -260,16 +261,12 @@ public ReadAll 
withConnectionConfiguration(RedisConnectionConfiguration connecti
 
   }
 
-  /**
-   * A {@link DoFn} requesting Redis server to get key/value pairs.
-   */
-  private static class ReadFn extends DoFn> {
-
-private final RedisConnectionConfiguration connectionConfiguration;
+  private abstract static class BaseReadFn extends DoFn {
+protected final RedisConnectionConfiguration connectionConfiguration;
 
-private transient Jedis jedis;
+protected transient Jedis jedis;
 
-public ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
   this.connectionConfiguration = connectionConfiguration;
 }
 
@@ -278,6 +275,18 @@ public void setup() {
   jedis = connectionConfiguration.connect();
 }
 
+@Teardown
+public void teardown() {
+  jedis.close();
+}
+  }
+
+  private static class ReadKeywsWithPattern extends BaseReadFn {
+
+ReadKeywsWithPattern(RedisConnectionConfiguration connectionConfiguration) 
{
+  super(connectionConfiguration);
+}
+
 @ProcessElement
 public void processElement(ProcessContext processContext) throws Exception 
{
   ScanParams scanParams = new ScanParams();
@@ -288,28 +297,33 @@ public void processElement(ProcessContext processContext) 
throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
-if (keys != null) {
-  for (String key : keys) {
-pipeline.get(key);
-  }
-  List values = pipeline.syncAndReturnAll();
-  for (int i = 0; i < values.size(); i++) {
-processContext.output(KV.of(keys.get(i), (String) values.get(i)));
-  }
+for (String k : keys) {
+  processContext.output(k);
 }
-
 cursor = scanResult.getStringCursor();
 if (cursor.equals("0")) {
   finished = true;
 }
   }
 }
+  }
+  /**
+   * A {@link DoFn} requesting Redis server to get key/value pairs.
+   */
+  private static class ReadFn extends BaseReadFn> {
 
-@Teardown
-public void teardown() {
-  jedis.close();
+ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+  super(connectionConfiguration);
+}
+
+@ProcessElement
+public void processElement(ProcessContext processContext) throws Exception 
{
+  String key = processContext.element();
+
+  String value = jedis.get(key);
+  if (value != null) {
+processContext.output(KV.of(key, value));
+  }
 }
 
   }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117792)
Time Spent: 2h 10m  (was: 2h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117786=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117786
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 30/Jun/18 07:40
Start Date: 30/Jun/18 07:40
Worklog Time Spent: 10m 
  Work Description: vvarma opened a new pull request #5841: Fixes 
https://issues.apache.org/jira/browse/BEAM-3446.
URL: https://github.com/apache/beam/pull/5841
 
 
   Rebase of https://github.com/apache/beam/pull/4656
   
   BaseReadFn to abstract general jedis operations.
   Moved key fetch given prefix to ReadKeywsWithPattern DoFn.
   ReadFn is pure fetch from redis given key.
   URL: https://issues.apache.org/jira/browse/BEAM-3446
   @iemejia  @jbonofre 
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117786)
Time Spent: 1h 50m  (was: 1h 40m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output 

[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117778=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117778
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 30/Jun/18 06:54
Start Date: 30/Jun/18 06:54
Worklog Time Spent: 10m 
  Work Description: vvarma commented on a change in pull request #4656: 
[BEAM-3446] Fixes RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#discussion_r199315229
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -288,28 +297,33 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
 
 Review comment:
   The number of keys that need to be looked up in a given window or batch can 
vary. Ideally, we should have a configurable batch size and use MGET 
`https://redis.io/commands/mget` if wanted to optimize further.
   Pipelining an entire window or batch can cause memory spikes in Redis 
depending on the number of keys being looked up, for the time being, to 
simplify things I removed pipeline.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117778)
Time Spent: 1h 40m  (was: 1.5h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117394=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117394
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 29/Jun/18 14:22
Start Date: 29/Jun/18 14:22
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #4656: 
[BEAM-3446] Fixes RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#discussion_r199170256
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -260,16 +261,12 @@ public ReadAll 
withConnectionConfiguration(RedisConnectionConfiguration connecti
 
   }
 
-  /**
-   * A {@link DoFn} requesting Redis server to get key/value pairs.
-   */
-  private static class ReadFn extends DoFn> {
-
-private final RedisConnectionConfiguration connectionConfiguration;
+  private abstract static class BaseReadFn extends DoFn {
+protected final RedisConnectionConfiguration connectionConfiguration;
 
 Review comment:
   can be package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117394)
Time Spent: 1h 10m  (was: 1h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117392=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117392
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 29/Jun/18 14:22
Start Date: 29/Jun/18 14:22
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #4656: 
[BEAM-3446] Fixes RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#discussion_r199146936
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -200,6 +200,7 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   return input
   .apply(Create.of(keyPattern()))
+  .apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration(
 
 Review comment:
   s/ReadKeywsWithPattern/ReadKeysWithPattern


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117392)
Time Spent: 50m  (was: 40m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117396=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117396
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 29/Jun/18 14:22
Start Date: 29/Jun/18 14:22
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #4656: 
[BEAM-3446] Fixes RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#discussion_r199170282
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -260,16 +261,12 @@ public ReadAll 
withConnectionConfiguration(RedisConnectionConfiguration connecti
 
   }
 
-  /**
-   * A {@link DoFn} requesting Redis server to get key/value pairs.
-   */
-  private static class ReadFn extends DoFn> {
-
-private final RedisConnectionConfiguration connectionConfiguration;
+  private abstract static class BaseReadFn extends DoFn {
+protected final RedisConnectionConfiguration connectionConfiguration;
 
-private transient Jedis jedis;
+protected transient Jedis jedis;
 
 Review comment:
   can be package private


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117396)
Time Spent: 1.5h  (was: 1h 20m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117395=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117395
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 29/Jun/18 14:22
Start Date: 29/Jun/18 14:22
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #4656: 
[BEAM-3446] Fixes RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#discussion_r199170873
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -288,28 +297,33 @@ public void processElement(ProcessContext 
processContext) throws Exception {
   while (!finished) {
 ScanResult scanResult = jedis.scan(cursor, scanParams);
 List keys = scanResult.getResult();
-
-Pipeline pipeline = jedis.pipelined();
 
 Review comment:
   Question: What is the reason to remove pipelining in general, seems like if 
the approach of this PR is more composable, it would perform worse, won't it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117395)
Time Spent: 1h 20m  (was: 1h 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=117393=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117393
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 29/Jun/18 14:22
Start Date: 29/Jun/18 14:22
Worklog Time Spent: 10m 
  Work Description: iemejia commented on a change in pull request #4656: 
[BEAM-3446] Fixes RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#discussion_r199170602
 
 

 ##
 File path: 
sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##
 @@ -260,16 +261,12 @@ public ReadAll 
withConnectionConfiguration(RedisConnectionConfiguration connecti
 
   }
 
-  /**
-   * A {@link DoFn} requesting Redis server to get key/value pairs.
-   */
-  private static class ReadFn extends DoFn> {
-
-private final RedisConnectionConfiguration connectionConfiguration;
+  private abstract static class BaseReadFn extends DoFn {
+protected final RedisConnectionConfiguration connectionConfiguration;
 
-private transient Jedis jedis;
+protected transient Jedis jedis;
 
-public ReadFn(RedisConnectionConfiguration connectionConfiguration) {
+public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
 
 Review comment:
   remove public, in general it is a common Beam practice to restrict access as 
much as possible.  You can use IntelliJ's analyze code to do this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 117393)
Time Spent: 1h  (was: 50m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Vinay varma
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=116882=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-116882
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 28/Jun/18 16:26
Start Date: 28/Jun/18 16:26
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #4656: [BEAM-3446] Fixes 
RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#issuecomment-401093881
 
 
   We have turned on autoformatting of the codebase, which causes small 
conflicts across the board. You can probably safely rebase and just keep your 
changes. Like this:
   
   ```
   $ git rebase
   ... see some conflicts
   $ git diff
   ... confirmed that the conflicts are just autoformatting
   ... so we can just keep our changes are do our own autoformat
   $ git checkout --theirs --
   $ git add -u
   $ git rebase --continue
   $ ./gradlew spotlessJavaApply
   ```
   
   Please ping me if you run into any difficulty. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 116882)
Time Spent: 40m  (was: 0.5h)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=110086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110086
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 08/Jun/18 12:04
Start Date: 08/Jun/18 12:04
Worklog Time Spent: 10m 
  Work Description: vvarma commented on issue #4656: [BEAM-3446] Fixes 
RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#issuecomment-395740529
 
 
   the fix is in place @jbonofre please help review


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110086)
Time Spent: 0.5h  (was: 20m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3446) RedisIO non-prefix read operations

2018-06-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3446?focusedWorklogId=109620=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-109620
 ]

ASF GitHub Bot logged work on BEAM-3446:


Author: ASF GitHub Bot
Created on: 07/Jun/18 04:01
Start Date: 07/Jun/18 04:01
Worklog Time Spent: 10m 
  Work Description: stale[bot] commented on issue #4656: [BEAM-3446] Fixes 
RedisIO non-prefix read operations 
URL: https://github.com/apache/beam/pull/4656#issuecomment-395285681
 
 
   This pull request has been marked as stale due to 60 days of inactivity. It 
will be closed in 1 week if no further activity occurs. If you think that’s 
incorrect or this pull request requires a review, please simply write any 
comment. If closed, you can revive the PR at any time and @mention a reviewer 
or discuss it on the d...@beam.apache.org list. Thank you for your 
contributions.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 109620)
Time Spent: 20m  (was: 10m)

> RedisIO non-prefix read operations
> --
>
> Key: BEAM-3446
> URL: https://issues.apache.org/jira/browse/BEAM-3446
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-redis
>Reporter: Vinay varma
>Assignee: Jean-Baptiste Onofré
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Read operation in RedisIO is for prefix based look ups. While this can be 
> used for exact key matches as well, the number of operations limits the 
> through put of the function.
> I suggest exposing current readAll operation as readbyprefix and using more 
> simpler operations for readAll functionality.
> ex:
> {code:java}
> String output = jedis.get(element);
> if (output != null) {
> processContext.output(KV.of(element, output));
> }
> {code}
> instead of:
> https://github.com/apache/beam/blob/7d240c0bb171af6868f1a6e95196c9dcfc9ac640/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java#L292



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)