[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114535=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114535
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 22:32
Start Date: 21/Jun/18 22:32
Worklog Time Spent: 10m 
  Work Description: kennknowles closed pull request #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index cbbafc72a93..eb7d9d2f109 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -80,6 +80,18 @@ public Calc copy(RelTraitSet traitSet, RelNode input, 
RexProgram program) {
 }
   }
 
+  public int getLimitCountOfSortRel() {
+if (input instanceof BeamSortRel) {
+  return ((BeamSortRel) input).getCount();
+}
+
+throw new RuntimeException("Could not get the limit count from a non 
BeamSortRel input.");
+  }
+
+  public boolean isInputSortRelAndLimitOnly() {
+return (input instanceof BeamSortRel) && ((BeamSortRel) 
input).isLimitOnly();
+  }
+
   /** {@code CalcFn} is the executor for a {@link BeamCalcRel} step. */
   public static class CalcFn extends DoFn {
 private BeamSqlExpressionExecutor executor;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 2fbcb5b1e72..8e32a6aa2de 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -19,14 +19,19 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.direct.DirectOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -35,8 +40,12 @@
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
@@ -56,9 +65,13 @@
 import org.apache.calcite.rel.convert.ConverterImpl;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** BeamRelNode to replace a {@code Enumerable} node. */
 public class BeamEnumerableConverter extends ConverterImpl implements 
EnumerableRel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamEnumerableConverter.class);
 
   public BeamEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, 
RelNode input) {
 super(cluster, ConventionTraitDef.INSTANCE, traits, input);
@@ -108,13 +121,37 @@ public static PipelineOptions 
createPipelineOptions(Map map) {
   
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
   if (node instanceof BeamIOSinkRel) {
 return count(options, node);
+  } else if (isLimitQuery(node)) {
+return limitCollect(options, node);
   }
+
   return collect(options, node);
 } finally {
   

[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114534=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114534
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 22:32
Start Date: 21/Jun/18 22:32
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399263935
 
 
   The tests actually part of this PR passed. We have some other quota issues 
but just for Dataflow.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114534)
Time Spent: 10h 10m  (was: 10h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114477
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 21:09
Start Date: 21/Jun/18 21:09
Worklog Time Spent: 10m 
  Work Description: amaliujia edited a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399241705
 
 
   `requesting more quota` failed a bunch of PostCommit tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114477)
Time Spent: 10h  (was: 9h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114467
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 21:00
Start Date: 21/Jun/18 21:00
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399241933
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114467)
Time Spent: 9h 50m  (was: 9h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114466
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 20:59
Start Date: 21/Jun/18 20:59
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399241705
 
 
   "requesting more quota" failed a bunch of PostCommit tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114466)
Time Spent: 9h 40m  (was: 9.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114448
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 20:02
Start Date: 21/Jun/18 20:02
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399225339
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114448)
Time Spent: 9.5h  (was: 9h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114447
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 20:02
Start Date: 21/Jun/18 20:02
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399225318
 
 
   ElasticSearchIO 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114447)
Time Spent: 9h 20m  (was: 9h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114425=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114425
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 18:40
Start Date: 21/Jun/18 18:40
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399203343
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114425)
Time Spent: 9h 10m  (was: 9h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114387=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114387
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 21/Jun/18 17:09
Start Date: 21/Jun/18 17:09
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-399176626
 
 
   LGTM, just squash commits and I'll merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 114387)
Time Spent: 9h  (was: 8h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113596=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113596
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 20/Jun/18 06:08
Start Date: 20/Jun/18 06:08
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398635002
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113596)
Time Spent: 8h 50m  (was: 8h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113549=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113549
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 20/Jun/18 04:37
Start Date: 20/Jun/18 04:37
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196646092
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -133,14 +204,108 @@ private static PipelineResult run(
 options
 .getRunner()
 .getCanonicalName()
-.equals("org.apache.beam.runners.direct.DirectRunner"));
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
 Collector.globalValues.put(id, values);
 run(options, node, new Collector());
 Collector.globalValues.remove(id);
 
 return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
+LimitStateVar limitStateVar = new LimitStateVar();
+int limitCount = getLimitCount(node);
+
+LimitCanceller.globalLimitArguments.put(id, limitCount);
+LimitCanceller.globalStates.put(id, limitStateVar);
+LimitCollector.globalLimitArguments.put(id, limitCount);
+LimitCollector.globalValues.put(id, values);
+limitRun(options, node, new LimitCollector(), new LimitCanceller(), 
limitStateVar);
+LimitCanceller.globalLimitArguments.remove(id);
+LimitCanceller.globalStates.remove(id);
+LimitCollector.globalLimitArguments.remove(id);
+LimitCollector.globalValues.remove(id);
+
+return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCanceller extends DoFn, Void> {
+private static final Map globalLimitArguments =
+new ConcurrentHashMap();
+private static final Map globalStates =
+new ConcurrentHashMap();
+
+@Nullable private volatile Integer count;
+@Nullable private volatile LimitStateVar limitStateVar;
+
+@StateId("counter")
+private final StateSpec> counter = 
StateSpecs.value(VarIntCoder.of());
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  long id = context.getPipelineOptions().getOptionsId();
+  count = globalLimitArguments.get(id);
+  limitStateVar = globalStates.get(id);
+}
+
+@ProcessElement
+public void processElement(
+ProcessContext context, @StateId("counter") ValueState 
counter) {
+  int current = (counter.read() != null ? counter.read() : 0);
+  current += 1;
+  if (current >= count && !limitStateVar.isReached()) {
+// if current count reaches the limit count but limitStateVar has not 
been set, flip
+// the var.
+limitStateVar.setReached();
+  }
+
+  counter.write(current);
+}
+  }
+
+  private static class LimitCollector extends DoFn> {
+// This will only work on the direct runner.
+private static final Map> globalValues =
+new ConcurrentHashMap>();
+private static final Map globalLimitArguments =
+new ConcurrentHashMap();
+
+@Nullable private volatile Queue values;
+@Nullable private volatile int count;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  long id = context.getPipelineOptions().getOptionsId();
+  values = globalValues.get(id);
+  count = globalLimitArguments.get(id);
+}
+
+@ProcessElement
+public void processElement(ProcessContext context) {
+  Object[] input = context.element().getValues().toArray();
+  if (values.size() < count) {
 
 Review comment:
   Because I still need to keep `LimitCanceller`(as a general "count and 
cancel" solution for all runners), and `LimitCanceller` needs KV as its input 
type(a requirement of stateful ParDo), I will have to keep this 
`LimitCollector` to emit KV elements to `LimitCanceller`. 
   
   I will take care of the race condition.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking

[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113472=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113472
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 20/Jun/18 00:33
Start Date: 20/Jun/18 00:33
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196619143
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -133,14 +204,108 @@ private static PipelineResult run(
 options
 .getRunner()
 .getCanonicalName()
-.equals("org.apache.beam.runners.direct.DirectRunner"));
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
 Collector.globalValues.put(id, values);
 run(options, node, new Collector());
 Collector.globalValues.remove(id);
 
 return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
+LimitStateVar limitStateVar = new LimitStateVar();
+int limitCount = getLimitCount(node);
+
+LimitCanceller.globalLimitArguments.put(id, limitCount);
+LimitCanceller.globalStates.put(id, limitStateVar);
+LimitCollector.globalLimitArguments.put(id, limitCount);
+LimitCollector.globalValues.put(id, values);
+limitRun(options, node, new LimitCollector(), new LimitCanceller(), 
limitStateVar);
+LimitCanceller.globalLimitArguments.remove(id);
+LimitCanceller.globalStates.remove(id);
+LimitCollector.globalLimitArguments.remove(id);
+LimitCollector.globalValues.remove(id);
+
+return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCanceller extends DoFn, Void> {
+private static final Map globalLimitArguments =
+new ConcurrentHashMap();
+private static final Map globalStates =
+new ConcurrentHashMap();
+
+@Nullable private volatile Integer count;
+@Nullable private volatile LimitStateVar limitStateVar;
+
+@StateId("counter")
+private final StateSpec> counter = 
StateSpecs.value(VarIntCoder.of());
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  long id = context.getPipelineOptions().getOptionsId();
+  count = globalLimitArguments.get(id);
+  limitStateVar = globalStates.get(id);
+}
+
+@ProcessElement
+public void processElement(
+ProcessContext context, @StateId("counter") ValueState 
counter) {
+  int current = (counter.read() != null ? counter.read() : 0);
+  current += 1;
+  if (current >= count && !limitStateVar.isReached()) {
+// if current count reaches the limit count but limitStateVar has not 
been set, flip
+// the var.
+limitStateVar.setReached();
+  }
+
+  counter.write(current);
+}
+  }
+
+  private static class LimitCollector extends DoFn> {
+// This will only work on the direct runner.
+private static final Map> globalValues =
+new ConcurrentHashMap>();
+private static final Map globalLimitArguments =
+new ConcurrentHashMap();
+
+@Nullable private volatile Queue values;
+@Nullable private volatile int count;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  long id = context.getPipelineOptions().getOptionsId();
+  values = globalValues.get(id);
+  count = globalLimitArguments.get(id);
+}
+
+@ProcessElement
+public void processElement(ProcessContext context) {
+  Object[] input = context.element().getValues().toArray();
+  if (values.size() < count) {
 
 Review comment:
   That's a great catch! Yes I agree to use `Collector` here and drop extra 
values after terminating the pipeline. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113472)
Time Spent: 8.5h  (was: 8h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: 

[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113466
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 20/Jun/18 00:19
Start Date: 20/Jun/18 00:19
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196617360
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -133,14 +204,108 @@ private static PipelineResult run(
 options
 .getRunner()
 .getCanonicalName()
-.equals("org.apache.beam.runners.direct.DirectRunner"));
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
 Collector.globalValues.put(id, values);
 run(options, node, new Collector());
 Collector.globalValues.remove(id);
 
 return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
+LimitStateVar limitStateVar = new LimitStateVar();
+int limitCount = getLimitCount(node);
+
+LimitCanceller.globalLimitArguments.put(id, limitCount);
+LimitCanceller.globalStates.put(id, limitStateVar);
+LimitCollector.globalLimitArguments.put(id, limitCount);
+LimitCollector.globalValues.put(id, values);
+limitRun(options, node, new LimitCollector(), new LimitCanceller(), 
limitStateVar);
+LimitCanceller.globalLimitArguments.remove(id);
+LimitCanceller.globalStates.remove(id);
+LimitCollector.globalLimitArguments.remove(id);
+LimitCollector.globalValues.remove(id);
+
+return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCanceller extends DoFn, Void> {
+private static final Map globalLimitArguments =
+new ConcurrentHashMap();
+private static final Map globalStates =
+new ConcurrentHashMap();
+
+@Nullable private volatile Integer count;
+@Nullable private volatile LimitStateVar limitStateVar;
+
+@StateId("counter")
+private final StateSpec> counter = 
StateSpecs.value(VarIntCoder.of());
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  long id = context.getPipelineOptions().getOptionsId();
+  count = globalLimitArguments.get(id);
+  limitStateVar = globalStates.get(id);
+}
+
+@ProcessElement
+public void processElement(
+ProcessContext context, @StateId("counter") ValueState 
counter) {
+  int current = (counter.read() != null ? counter.read() : 0);
+  current += 1;
+  if (current >= count && !limitStateVar.isReached()) {
+// if current count reaches the limit count but limitStateVar has not 
been set, flip
+// the var.
+limitStateVar.setReached();
+  }
+
+  counter.write(current);
+}
+  }
+
+  private static class LimitCollector extends DoFn> {
+// This will only work on the direct runner.
+private static final Map> globalValues =
+new ConcurrentHashMap>();
+private static final Map globalLimitArguments =
+new ConcurrentHashMap();
+
+@Nullable private volatile Queue values;
+@Nullable private volatile int count;
+
+@StartBundle
+public void startBundle(StartBundleContext context) {
+  long id = context.getPipelineOptions().getOptionsId();
+  values = globalValues.get(id);
+  count = globalLimitArguments.get(id);
+}
+
+@ProcessElement
+public void processElement(ProcessContext context) {
+  Object[] input = context.element().getValues().toArray();
+  if (values.size() < count) {
 
 Review comment:
   Now that I think about it, you have a race between checking this count and 
adding to the queue. The directRunner has a minimum of 3 worker threads so it 
is possible to hit. I still think the simple, safe, and portable implementation 
is to use `Collector`, check the size of the queue outside of the pipeline, and 
truncate after terminating the pipeline.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113448
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:39
Start Date: 19/Jun/18 23:39
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196610533
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -95,13 +107,37 @@ public Result implement(EnumerableRelImplementor 
implementor, Prefer prefer) {
   
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
   if (node instanceof BeamIOSinkRel) {
 return count(options, node);
+  } else if (isLimitQuery(node)) {
+return limitCollect(options, node);
   }
+
   return collect(options, node);
 } finally {
   Thread.currentThread().setContextClassLoader(originalClassLoader);
 }
   }
 
+  enum LimitState {
 
 Review comment:
   Yep. Using enum to have descriptive state than true/false.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113448)
Time Spent: 8h 10m  (was: 8h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113446
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:36
Start Date: 19/Jun/18 23:36
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196610932
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -125,6 +162,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+// wait every one second to check state of PipelineResult. If 
returned value
+// is null, or return state is non termination state, it means 
pipeline is still
+// running and within loop limit counter state will be checked.
+State state;
+while (true) {
+  state = result.waitUntilFinish(Duration.standardSeconds(1));
+  if (state != null && state.isTerminal()) {
+break;
+  }
+  // If state is null, or state is not null and indicates pipeline 
is not terminated
+  // yet, check continue checking the limit var.
+  try {
+if (limitStateVar.isReached()) {
 
 Review comment:
   I think we have a very different model as to how this will work in other 
runners. The runners shouldn't need to be aware of the limit, as that would be 
applied client side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113446)
Time Spent: 8h  (was: 7h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113445
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:33
Start Date: 19/Jun/18 23:33
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196610533
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -95,13 +107,37 @@ public Result implement(EnumerableRelImplementor 
implementor, Prefer prefer) {
   
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
   if (node instanceof BeamIOSinkRel) {
 return count(options, node);
+  } else if (isLimitQuery(node)) {
+return limitCollect(options, node);
   }
+
   return collect(options, node);
 } finally {
   Thread.currentThread().setContextClassLoader(originalClassLoader);
 }
   }
 
+  enum LimitState {
 
 Review comment:
   Yep. Using enum to have descriptive state.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113445)
Time Spent: 7h 50m  (was: 7h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113441
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:32
Start Date: 19/Jun/18 23:32
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196608975
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -125,6 +162,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+// wait every one second to check state of PipelineResult. If 
returned value
+// is null, or return state is non termination state, it means 
pipeline is still
+// running and within loop limit counter state will be checked.
+State state;
+while (true) {
+  state = result.waitUntilFinish(Duration.standardSeconds(1));
+  if (state != null && state.isTerminal()) {
+break;
+  }
+  // If state is null, or state is not null and indicates pipeline 
is not terminated
+  // yet, check continue checking the limit var.
+  try {
+if (limitStateVar.isReached()) {
 
 Review comment:
   I could replace `LimitCollector` with `Collector` with the cost of removing 
extra collected values after the pipeline finishes, or I modify `Collector` and 
add something like `values.size() >= limitCount` to make `Collector` general. 
To make code more readable, I wrote a dedicated `LimitCollector`. 
   
   Also, I'd like to separate `count and cancel` and `collect` steps to support 
unbounded limit. Right now `Collector` seems like only work in DirectRunner but 
I can make `count and cancel` more general to fit into other runners by using 
stateful ParDo (that's what LimitCanceller does). So in the future if we start 
to support unbounded limit in other runners, only `collect` step needs 
modification.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113441)
Time Spent: 7h 10m  (was: 7h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113443
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:32
Start Date: 19/Jun/18 23:32
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196610384
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -133,14 +219,108 @@ private static PipelineResult run(
 options
 .getRunner()
 .getCanonicalName()
-.equals("org.apache.beam.runners.direct.DirectRunner"));
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"Only DirectRunner is supported in SQL Shell. " + "Please check your 
Runner setting.");
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113443)
Time Spent: 7.5h  (was: 7h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113442
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:32
Start Date: 19/Jun/18 23:32
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196608975
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -125,6 +162,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+// wait every one second to check state of PipelineResult. If 
returned value
+// is null, or return state is non termination state, it means 
pipeline is still
+// running and within loop limit counter state will be checked.
+State state;
+while (true) {
+  state = result.waitUntilFinish(Duration.standardSeconds(1));
+  if (state != null && state.isTerminal()) {
+break;
+  }
+  // If state is null, or state is not null and indicates pipeline 
is not terminated
+  // yet, check continue checking the limit var.
+  try {
+if (limitStateVar.isReached()) {
 
 Review comment:
   I could replace `LimitCollector` with `Collector` with the cost of removing 
extra collected values after the pipeline finishes, or I modify `Collector` and 
add something like `values.size() >= limitCount` to make `Collector` general. 
To make code more readable, I wrote a dedicated `LimitCollector`. 
   
   Also, I'd like to separate `count and cancel` and `collect` steps to support 
unbounded limit. Right now `Collector` seems like only work in DirectRunner but 
I can make `count and cancel` more general to fit into other runners by using 
stateful ParDo (that's what `LimitCanceller` does). So in the future if we 
start to support unbounded limit in other runners, only `collect` step needs 
modification.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113442)
Time Spent: 7h 20m  (was: 7h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113444=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113444
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:32
Start Date: 19/Jun/18 23:32
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196610388
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -111,6 +147,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
 
 Review comment:
   fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113444)
Time Spent: 7h 40m  (was: 7.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113437
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:29
Start Date: 19/Jun/18 23:29
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196608975
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -125,6 +162,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+// wait every one second to check state of PipelineResult. If 
returned value
+// is null, or return state is non termination state, it means 
pipeline is still
+// running and within loop limit counter state will be checked.
+State state;
+while (true) {
+  state = result.waitUntilFinish(Duration.standardSeconds(1));
+  if (state != null && state.isTerminal()) {
+break;
+  }
+  // If state is null, or state is not null and indicates pipeline 
is not terminated
+  // yet, check continue checking the limit var.
+  try {
+if (limitStateVar.isReached()) {
 
 Review comment:
   I could replace `LimitCollector` with `Collector` with the cost of removing 
extra collected values after the pipeline finishes, or I modify `Collector` and 
add something like `values.size() >= limitCount` to make `Collector` general. 
To make code more readable, I wrote a dedicated `LimitCollector`. 
   
   Also, I'd like to separate `count and cancel` and `collect` steps to support 
unbounded limit. Right now `Collector` seems like only work in DirectRunner but 
I can make `count and cancel` more general to fit into other runners by using 
stateful ParDo. So in the future if we start to support other runners, only 
`collect` step needs modification.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113437)
Time Spent: 6h 50m  (was: 6h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113438=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113438
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:29
Start Date: 19/Jun/18 23:29
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196608975
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -125,6 +162,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+// wait every one second to check state of PipelineResult. If 
returned value
+// is null, or return state is non termination state, it means 
pipeline is still
+// running and within loop limit counter state will be checked.
+State state;
+while (true) {
+  state = result.waitUntilFinish(Duration.standardSeconds(1));
+  if (state != null && state.isTerminal()) {
+break;
+  }
+  // If state is null, or state is not null and indicates pipeline 
is not terminated
+  // yet, check continue checking the limit var.
+  try {
+if (limitStateVar.isReached()) {
 
 Review comment:
   I could replace `LimitCollector` with `Collector` with the cost of removing 
extra collected values after the pipeline finishes, or I modify `Collector` and 
add something like `values.size() >= limitCount` to make `Collector` general. 
To make code more readable, I wrote a dedicated `LimitCollector`. 
   
   Also, I'd like to separate `count and cancel` and `collect` steps to support 
unbounded limit. Right now `Collector` seems like only work in DirectRunner but 
I can make `count and cancel` more general to fit into other runners by using 
stateful ParDo. So in the future if we start to support unbounded limit in 
other runners, only `collect` step needs modification.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113438)
Time Spent: 7h  (was: 6h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113435
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:28
Start Date: 19/Jun/18 23:28
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196609438
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -111,6 +147,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
 
 Review comment:
   I want to use this function to encapsulate related logic together.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113435)
Time Spent: 6.5h  (was: 6h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113436
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:28
Start Date: 19/Jun/18 23:28
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196609486
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -111,6 +147,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
 
 Review comment:
   To avoid a long `limitCollect` function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113436)
Time Spent: 6h 40m  (was: 6.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113434=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113434
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 23:25
Start Date: 19/Jun/18 23:25
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196608975
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -125,6 +162,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+// wait every one second to check state of PipelineResult. If 
returned value
+// is null, or return state is non termination state, it means 
pipeline is still
+// running and within loop limit counter state will be checked.
+State state;
+while (true) {
+  state = result.waitUntilFinish(Duration.standardSeconds(1));
+  if (state != null && state.isTerminal()) {
+break;
+  }
+  // If state is null, or state is not null and indicates pipeline 
is not terminated
+  // yet, check continue checking the limit var.
+  try {
+if (limitStateVar.isReached()) {
 
 Review comment:
   I could replace `LimitCollector` with `Collector` with the cost of removing 
extra collected values after the pipeline finishes, or I modify `Collector` and 
add ``values.size() >= limitCount` to make `Collector` general. To make code 
more readable, I wrote a dedicated `LimitCollector`. 
   
   Also, I'd like to separate `count and cancel` and `collect` steps to support 
unbounded limit. Right now `Collector` seems like only work in DirectRunner but 
I can make `count and cancel` more general to fit into other runners by using 
stateful ParDo. So in the future if we start to support other runners, only 
`collect` step needs modification.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113434)
Time Spent: 6h 20m  (was: 6h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113418=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113418
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 22:28
Start Date: 19/Jun/18 22:28
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196598266
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -125,6 +162,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+// wait every one second to check state of PipelineResult. If 
returned value
+// is null, or return state is non termination state, it means 
pipeline is still
+// running and within loop limit counter state will be checked.
+State state;
+while (true) {
+  state = result.waitUntilFinish(Duration.standardSeconds(1));
+  if (state != null && state.isTerminal()) {
+break;
+  }
+  // If state is null, or state is not null and indicates pipeline 
is not terminated
+  // yet, check continue checking the limit var.
+  try {
+if (limitStateVar.isReached()) {
 
 Review comment:
   If you did something like `values.size() >= limitCount` here you could 
replace `LimitCanceller` and `LimitCollector` with `Collector`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113418)
Time Spent: 6h 10m  (was: 6h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113417=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113417
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 22:28
Start Date: 19/Jun/18 22:28
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196595217
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -111,6 +147,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn> collectDoFn,
+  DoFn, Void> limitCounterDoFn,
+  LimitStateVar limitStateVar) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+PCollectionTuple.empty(pipeline)
+.apply(node.toPTransform())
+.apply(ParDo.of(collectDoFn))
+.apply(ParDo.of(limitCounterDoFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
 
 Review comment:
   The extra thread seems unnecessary. You can do this in the main thread.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113417)
Time Spent: 6h  (was: 5h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113415=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113415
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 22:28
Start Date: 19/Jun/18 22:28
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196594705
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -111,6 +147,55 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
 
 Review comment:
   The point of `run` was to have exactly one for everything. Seeing as that 
isn't going to work, you should probably just inline this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113415)
Time Spent: 5h 40m  (was: 5.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113414=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113414
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 22:28
Start Date: 19/Jun/18 22:28
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196594393
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -95,13 +107,37 @@ public Result implement(EnumerableRelImplementor 
implementor, Prefer prefer) {
   
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
   if (node instanceof BeamIOSinkRel) {
 return count(options, node);
+  } else if (isLimitQuery(node)) {
+return limitCollect(options, node);
   }
+
   return collect(options, node);
 } finally {
   Thread.currentThread().setContextClassLoader(originalClassLoader);
 }
   }
 
+  enum LimitState {
 
 Review comment:
   nit: Isn't this just a Boolean?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113414)
Time Spent: 5.5h  (was: 5h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113416=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113416
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 22:28
Start Date: 19/Jun/18 22:28
Worklog Time Spent: 10m 
  Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196595720
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -133,14 +219,108 @@ private static PipelineResult run(
 options
 .getRunner()
 .getCanonicalName()
-.equals("org.apache.beam.runners.direct.DirectRunner"));
+.equals("org.apache.beam.runners.direct.DirectRunner"),
+"Only DirectRunner is supported in SQL Shell. " + "Please check your 
Runner setting.");
 
 Review comment:
   This isn't exactly true. Should be something like "SELECT without INSERT is 
only supported in DirectRunner in SQL Shell."


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113416)
Time Spent: 5h 50m  (was: 5h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113412=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113412
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 22:25
Start Date: 19/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398565693
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113412)
Time Spent: 5h 20m  (was: 5h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113389
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 21:30
Start Date: 19/Jun/18 21:30
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398552880
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113389)
Time Spent: 5h 10m  (was: 5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113388=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113388
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 21:30
Start Date: 19/Jun/18 21:30
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398552843
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113388)
Time Spent: 5h  (was: 4h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113375=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113375
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:57
Start Date: 19/Jun/18 20:57
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398542010
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113375)
Time Spent: 4h 50m  (was: 4h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113374=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113374
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:56
Start Date: 19/Jun/18 20:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398543026
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113374)
Time Spent: 4h 40m  (was: 4.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113369=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113369
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:53
Start Date: 19/Jun/18 20:53
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398542010
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113369)
Time Spent: 4.5h  (was: 4h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113368=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113368
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:53
Start Date: 19/Jun/18 20:53
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398538364
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113368)
Time Spent: 4h 20m  (was: 4h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113364
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:41
Start Date: 19/Jun/18 20:41
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398538364
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113364)
Time Spent: 4h 10m  (was: 4h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113361=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113361
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:41
Start Date: 19/Jun/18 20:41
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196570798
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -127,6 +194,66 @@ private static PipelineResult run(
 return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"));
+
+LimitStateWrapper stateWrapper = new LimitStateWrapper();
+LimitCounter.globalValues.put(id, values);
+LimitCounter.globalLimitArguments.put(id, getLimitCount(node));
+LimitCounter.globalStates.put(id, stateWrapper);
+limitRun(options, node, new LimitCounter(), stateWrapper);
+LimitCounter.globalValues.remove(id);
+LimitCounter.globalLimitArguments.remove(id);
+LimitCounter.globalStates.remove(id);
+
+return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCounter extends DoFn {
+private static final Map globalLimitArguments =
 
 Review comment:
   Use stateful ParDo to implement counter/cancelling now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113361)
Time Spent: 3h 40m  (was: 3.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113362=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113362
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:41
Start Date: 19/Jun/18 20:41
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196570806
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -127,6 +194,66 @@ private static PipelineResult run(
 return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"));
 
 Review comment:
   Added something to say only DirectRunner is supported and please check 
option setting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113362)
Time Spent: 3h 50m  (was: 3h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113363
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 20:41
Start Date: 19/Jun/18 20:41
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196570820
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -95,13 +101,37 @@ public Result implement(EnumerableRelImplementor 
implementor, Prefer prefer) {
   
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
   if (node instanceof BeamIOSinkRel) {
 return count(options, node);
+  } else if (isLimitQuery(node)) {
+return limitCollect(options, node);
   }
+
   return collect(options, node);
 } finally {
   Thread.currentThread().setContextClassLoader(originalClassLoader);
 }
   }
 
+  enum LimitState {
+REACHED,
+NOT_REACHED
+  }
+
+  private static class LimitStateWrapper implements Serializable {
 
 Review comment:
   Rename it to LimitStateVar


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113363)
Time Spent: 4h  (was: 3h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113308=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113308
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 18:31
Start Date: 19/Jun/18 18:31
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196533522
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -111,6 +141,43 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn doFn,
+  LimitStateWrapper stateWrapper) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+
PCollectionTuple.empty(pipeline).apply(node.toPTransform()).apply(ParDo.of(doFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+while (!result.getState().isTerminal()) {
+  try {
+Thread.sleep(DaemonThreadSleepIntervalMillis);
 
 Review comment:
   Great suggestion! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113308)
Time Spent: 3.5h  (was: 3h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113270
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 17:28
Start Date: 19/Jun/18 17:28
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on a change in pull request 
#5682: [BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196512057
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -127,6 +194,66 @@ private static PipelineResult run(
 return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"));
+
+LimitStateWrapper stateWrapper = new LimitStateWrapper();
+LimitCounter.globalValues.put(id, values);
+LimitCounter.globalLimitArguments.put(id, getLimitCount(node));
+LimitCounter.globalStates.put(id, stateWrapper);
+limitRun(options, node, new LimitCounter(), stateWrapper);
+LimitCounter.globalValues.remove(id);
+LimitCounter.globalLimitArguments.remove(id);
+LimitCounter.globalStates.remove(id);
+
+return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCounter extends DoFn {
+private static final Map globalLimitArguments =
 
 Review comment:
   In the `DirectRunner` this is OK. I think you could pretty easily make it a 
bit more general by using stateful ParDo(DoFn) for counting. Then the only 
thing left to make it work on a distributed runner is a communication path back 
to the shell. If you leave it like this, I think the `DoFn` also is not ready, 
because in a distributed run there will be lots of copies and separate JVMs on 
different workers, not gathered to one key, etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113270)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113272=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113272
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 17:28
Start Date: 19/Jun/18 17:28
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on a change in pull request 
#5682: [BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196510662
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -127,6 +194,66 @@ private static PipelineResult run(
 return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable limitCollect(PipelineOptions options, 
BeamRelNode node) {
+long id = options.getOptionsId();
+Queue values = new ConcurrentLinkedQueue();
+
+checkArgument(
+options
+.getRunner()
+.getCanonicalName()
+.equals("org.apache.beam.runners.direct.DirectRunner"));
 
 Review comment:
   Can you add a message to this check? That way when it fails it will explain 
what is going on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113272)
Time Spent: 3h 20m  (was: 3h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113271
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 17:28
Start Date: 19/Jun/18 17:28
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on a change in pull request 
#5682: [BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196510366
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -95,13 +101,37 @@ public Result implement(EnumerableRelImplementor 
implementor, Prefer prefer) {
   
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
   if (node instanceof BeamIOSinkRel) {
 return count(options, node);
+  } else if (isLimitQuery(node)) {
+return limitCollect(options, node);
   }
+
   return collect(options, node);
 } finally {
   Thread.currentThread().setContextClassLoader(originalClassLoader);
 }
   }
 
+  enum LimitState {
+REACHED,
+NOT_REACHED
+  }
+
+  private static class LimitStateWrapper implements Serializable {
 
 Review comment:
   Why is there a wrapper? Is this to make it clearly something like an `LVar`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113271)
Time Spent: 3h 20m  (was: 3h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113269
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 17:28
Start Date: 19/Jun/18 17:28
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on a change in pull request 
#5682: [BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196509427
 
 

 ##
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##
 @@ -111,6 +141,43 @@ private static PipelineResult run(
 return result;
   }
 
+  private static PipelineResult limitRun(
+  PipelineOptions options,
+  BeamRelNode node,
+  DoFn doFn,
+  LimitStateWrapper stateWrapper) {
+ExecutorService pool = Executors.newFixedThreadPool(1);
+
+options.as(DirectOptions.class).setBlockOnRun(false);
+Pipeline pipeline = Pipeline.create(options);
+
PCollectionTuple.empty(pipeline).apply(node.toPTransform()).apply(ParDo.of(doFn));
+
+PipelineResult result = pipeline.run();
+
+pool.execute(
+new Runnable() {
+  @Override
+  public void run() {
+while (!result.getState().isTerminal()) {
+  try {
+Thread.sleep(DaemonThreadSleepIntervalMillis);
 
 Review comment:
   I think result.waitUntilFinish(duration) is good here: 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L58


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113269)
Time Spent: 3h 10m  (was: 3h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113245=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113245
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:55
Start Date: 19/Jun/18 16:55
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398265245
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113245)
Time Spent: 2h 20m  (was: 2h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113247=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113247
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:55
Start Date: 19/Jun/18 16:55
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398291401
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113247)
Time Spent: 2h 40m  (was: 2.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113249=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113249
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:56
Start Date: 19/Jun/18 16:56
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398470292
 
 
   R: @kennknowles 
   cc: @apilloud @akedin 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113249)
Time Spent: 3h  (was: 2h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113246=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113246
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:55
Start Date: 19/Jun/18 16:55
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398289136
 
 
   run java precommit
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113246)
Time Spent: 2.5h  (was: 2h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113248=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113248
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:55
Start Date: 19/Jun/18 16:55
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398309597
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113248)
Time Spent: 2h 50m  (was: 2h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113244=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113244
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:55
Start Date: 19/Jun/18 16:55
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398263801
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113244)
Time Spent: 2h 10m  (was: 2h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113243=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113243
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:55
Start Date: 19/Jun/18 16:55
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398263327
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113243)
Time Spent: 2h  (was: 1h 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113242=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113242
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 16:55
Start Date: 19/Jun/18 16:55
Worklog Time Spent: 10m 
  Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398263214
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113242)
Time Spent: 1h 50m  (was: 1h 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113086
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 07:59
Start Date: 19/Jun/18 07:59
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398310217
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113086)
Time Spent: 1h 40m  (was: 1.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113084=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113084
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 07:57
Start Date: 19/Jun/18 07:57
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398309597
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113084)
Time Spent: 1.5h  (was: 1h 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113043=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113043
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 06:46
Start Date: 19/Jun/18 06:46
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398292372
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113043)
Time Spent: 1h 20m  (was: 1h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113038
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 06:41
Start Date: 19/Jun/18 06:41
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398291401
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113038)
Time Spent: 1h 10m  (was: 1h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113031=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113031
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 06:30
Start Date: 19/Jun/18 06:30
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398289136
 
 
   run java precommit
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 113031)
Time Spent: 1h  (was: 50m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112992=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112992
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 03:45
Start Date: 19/Jun/18 03:45
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398265245
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112992)
Time Spent: 50m  (was: 40m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112989=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112989
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 03:34
Start Date: 19/Jun/18 03:34
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398263801
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112989)
Time Spent: 40m  (was: 0.5h)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112988
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 03:30
Start Date: 19/Jun/18 03:30
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398263327
 
 
   run java postcommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112988)
Time Spent: 0.5h  (was: 20m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112987
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 03:30
Start Date: 19/Jun/18 03:30
Worklog Time Spent: 10m 
  Work Description: amaliujia commented on issue #5682: [BEAM-4194] support 
unbounded limit
URL: https://github.com/apache/beam/pull/5682#issuecomment-398263214
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112987)
Time Spent: 20m  (was: 10m)

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data

2018-06-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112986=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112986
 ]

ASF GitHub Bot logged work on BEAM-4194:


Author: ASF GitHub Bot
Created on: 19/Jun/18 03:29
Start Date: 19/Jun/18 03:29
Worklog Time Spent: 10m 
  Work Description: amaliujia opened a new pull request #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682
 
 
   # What?
   Support unbounded limit in Beam SQL shell. In the past, due to default 
global window and default trigger, queries like "SELECT col_name FROM 
unbounded_table LIMIT 1" will not return in SQL Shell.
   
   This PR tries to support unbounded limit by starting a daemon thread to 
monitoring return value collection in BeanEnumerableConverter, and stop 
pipeline when collected values reach limit count. More detailed description can 
be found here: 
https://docs.google.com/document/d/13zeTewHH9nfwhSlcE4x77WQwr1U2Z4sTiNRjOXUj2aw/edit?usp=sharing.
   
   # Testing
- [x] Unit test.

   Adding two unit tests to mock bounded and unbounded input tables to test 
LIMIT functionality.
   
- [x] Integration test.
   
   Adding one e2e integration test, which utilizes Google Cloud Pub/Sub to test 
the unbounded limit on auto-generated Pub/Sub messages.
   
- [x] Other test.
   
   Manually tested unbounded limit in Beam SQL shell on a Google Cloud Pub/Sub 
table. See the screenshot:
   https://user-images.githubusercontent.com/1938382/41574436-106ca752-7336-11e8-91a7-33200513d4b8.png;>
   
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112986)
Time Spent: 10m
Remaining Estimate: 0h

> [SQL] Support LIMIT on Unbounded Data
> -
>
> Key: BEAM-4194
> URL: https://issues.apache.org/jira/browse/BEAM-4194
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql
>Reporter: Anton Kedin
>Assignee: Rui Wang
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)