[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114535=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114535 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 22:32 Start Date: 21/Jun/18 22:32 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index cbbafc72a93..eb7d9d2f109 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -80,6 +80,18 @@ public Calc copy(RelTraitSet traitSet, RelNode input, RexProgram program) { } } + public int getLimitCountOfSortRel() { +if (input instanceof BeamSortRel) { + return ((BeamSortRel) input).getCount(); +} + +throw new RuntimeException("Could not get the limit count from a non BeamSortRel input."); + } + + public boolean isInputSortRelAndLimitOnly() { +return (input instanceof BeamSortRel) && ((BeamSortRel) input).isLimitOnly(); + } + /** {@code CalcFn} is the executor for a {@link BeamCalcRel} step. */ public static class CalcFn extends DoFn { private BeamSqlExpressionExecutor executor; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 2fbcb5b1e72..8e32a6aa2de 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -19,14 +19,19 @@ import static com.google.common.base.Preconditions.checkArgument; +import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import javax.annotation.Nullable; +import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; @@ -35,8 +40,12 @@ import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; import org.apache.calcite.adapter.enumerable.EnumerableRel; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; @@ -56,9 +65,13 @@ import org.apache.calcite.rel.convert.ConverterImpl; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** BeamRelNode to replace a {@code Enumerable} node. */ public class BeamEnumerableConverter extends ConverterImpl implements EnumerableRel { + private static final Logger LOG = LoggerFactory.getLogger(BeamEnumerableConverter.class); public BeamEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, RelNode input) { super(cluster, ConventionTraitDef.INSTANCE, traits, input); @@ -108,13 +121,37 @@ public static PipelineOptions createPipelineOptions(Map map) { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); if (node instanceof BeamIOSinkRel) { return count(options, node); + } else if (isLimitQuery(node)) { +return limitCollect(options, node); } + return collect(options, node); } finally {
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114534=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114534 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 22:32 Start Date: 21/Jun/18 22:32 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399263935 The tests actually part of this PR passed. We have some other quota issues but just for Dataflow. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114534) Time Spent: 10h 10m (was: 10h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 10h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114477 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 21:09 Start Date: 21/Jun/18 21:09 Worklog Time Spent: 10m Work Description: amaliujia edited a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399241705 `requesting more quota` failed a bunch of PostCommit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114477) Time Spent: 10h (was: 9h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 10h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114467=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114467 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 21:00 Start Date: 21/Jun/18 21:00 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399241933 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114467) Time Spent: 9h 50m (was: 9h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 9h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114466 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 20:59 Start Date: 21/Jun/18 20:59 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399241705 "requesting more quota" failed a bunch of PostCommit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114466) Time Spent: 9h 40m (was: 9.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114448 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 20:02 Start Date: 21/Jun/18 20:02 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399225339 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114448) Time Spent: 9.5h (was: 9h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 9.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114447 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 20:02 Start Date: 21/Jun/18 20:02 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399225318 ElasticSearchIO This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114447) Time Spent: 9h 20m (was: 9h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 9h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114425=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114425 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 18:40 Start Date: 21/Jun/18 18:40 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399203343 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114425) Time Spent: 9h 10m (was: 9h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 9h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114387=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114387 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 21/Jun/18 17:09 Start Date: 21/Jun/18 17:09 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-399176626 LGTM, just squash commits and I'll merge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 114387) Time Spent: 9h (was: 8h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 9h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113596=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113596 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 20/Jun/18 06:08 Start Date: 20/Jun/18 06:08 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398635002 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113596) Time Spent: 8h 50m (was: 8h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113549=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113549 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 20/Jun/18 04:37 Start Date: 20/Jun/18 04:37 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196646092 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -133,14 +204,108 @@ private static PipelineResult run( options .getRunner() .getCanonicalName() -.equals("org.apache.beam.runners.direct.DirectRunner")); +.equals("org.apache.beam.runners.direct.DirectRunner"), +"SELECT without INSERT is only supported in DirectRunner in SQL Shell."); + Collector.globalValues.put(id, values); run(options, node, new Collector()); Collector.globalValues.remove(id); return Linq4j.asEnumerable(values); } + private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner"), +"SELECT without INSERT is only supported in DirectRunner in SQL Shell."); + +LimitStateVar limitStateVar = new LimitStateVar(); +int limitCount = getLimitCount(node); + +LimitCanceller.globalLimitArguments.put(id, limitCount); +LimitCanceller.globalStates.put(id, limitStateVar); +LimitCollector.globalLimitArguments.put(id, limitCount); +LimitCollector.globalValues.put(id, values); +limitRun(options, node, new LimitCollector(), new LimitCanceller(), limitStateVar); +LimitCanceller.globalLimitArguments.remove(id); +LimitCanceller.globalStates.remove(id); +LimitCollector.globalLimitArguments.remove(id); +LimitCollector.globalValues.remove(id); + +return Linq4j.asEnumerable(values); + } + + private static class LimitCanceller extends DoFn, Void> { +private static final Map globalLimitArguments = +new ConcurrentHashMap(); +private static final Map globalStates = +new ConcurrentHashMap(); + +@Nullable private volatile Integer count; +@Nullable private volatile LimitStateVar limitStateVar; + +@StateId("counter") +private final StateSpec> counter = StateSpecs.value(VarIntCoder.of()); + +@StartBundle +public void startBundle(StartBundleContext context) { + long id = context.getPipelineOptions().getOptionsId(); + count = globalLimitArguments.get(id); + limitStateVar = globalStates.get(id); +} + +@ProcessElement +public void processElement( +ProcessContext context, @StateId("counter") ValueState counter) { + int current = (counter.read() != null ? counter.read() : 0); + current += 1; + if (current >= count && !limitStateVar.isReached()) { +// if current count reaches the limit count but limitStateVar has not been set, flip +// the var. +limitStateVar.setReached(); + } + + counter.write(current); +} + } + + private static class LimitCollector extends DoFn> { +// This will only work on the direct runner. +private static final Map> globalValues = +new ConcurrentHashMap>(); +private static final Map globalLimitArguments = +new ConcurrentHashMap(); + +@Nullable private volatile Queue values; +@Nullable private volatile int count; + +@StartBundle +public void startBundle(StartBundleContext context) { + long id = context.getPipelineOptions().getOptionsId(); + values = globalValues.get(id); + count = globalLimitArguments.get(id); +} + +@ProcessElement +public void processElement(ProcessContext context) { + Object[] input = context.element().getValues().toArray(); + if (values.size() < count) { Review comment: Because I still need to keep `LimitCanceller`(as a general "count and cancel" solution for all runners), and `LimitCanceller` needs KV as its input type(a requirement of stateful ParDo), I will have to keep this `LimitCollector` to emit KV elements to `LimitCanceller`. I will take care of the race condition. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113472=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113472 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 20/Jun/18 00:33 Start Date: 20/Jun/18 00:33 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196619143 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -133,14 +204,108 @@ private static PipelineResult run( options .getRunner() .getCanonicalName() -.equals("org.apache.beam.runners.direct.DirectRunner")); +.equals("org.apache.beam.runners.direct.DirectRunner"), +"SELECT without INSERT is only supported in DirectRunner in SQL Shell."); + Collector.globalValues.put(id, values); run(options, node, new Collector()); Collector.globalValues.remove(id); return Linq4j.asEnumerable(values); } + private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner"), +"SELECT without INSERT is only supported in DirectRunner in SQL Shell."); + +LimitStateVar limitStateVar = new LimitStateVar(); +int limitCount = getLimitCount(node); + +LimitCanceller.globalLimitArguments.put(id, limitCount); +LimitCanceller.globalStates.put(id, limitStateVar); +LimitCollector.globalLimitArguments.put(id, limitCount); +LimitCollector.globalValues.put(id, values); +limitRun(options, node, new LimitCollector(), new LimitCanceller(), limitStateVar); +LimitCanceller.globalLimitArguments.remove(id); +LimitCanceller.globalStates.remove(id); +LimitCollector.globalLimitArguments.remove(id); +LimitCollector.globalValues.remove(id); + +return Linq4j.asEnumerable(values); + } + + private static class LimitCanceller extends DoFn, Void> { +private static final Map globalLimitArguments = +new ConcurrentHashMap(); +private static final Map globalStates = +new ConcurrentHashMap(); + +@Nullable private volatile Integer count; +@Nullable private volatile LimitStateVar limitStateVar; + +@StateId("counter") +private final StateSpec> counter = StateSpecs.value(VarIntCoder.of()); + +@StartBundle +public void startBundle(StartBundleContext context) { + long id = context.getPipelineOptions().getOptionsId(); + count = globalLimitArguments.get(id); + limitStateVar = globalStates.get(id); +} + +@ProcessElement +public void processElement( +ProcessContext context, @StateId("counter") ValueState counter) { + int current = (counter.read() != null ? counter.read() : 0); + current += 1; + if (current >= count && !limitStateVar.isReached()) { +// if current count reaches the limit count but limitStateVar has not been set, flip +// the var. +limitStateVar.setReached(); + } + + counter.write(current); +} + } + + private static class LimitCollector extends DoFn> { +// This will only work on the direct runner. +private static final Map> globalValues = +new ConcurrentHashMap>(); +private static final Map globalLimitArguments = +new ConcurrentHashMap(); + +@Nullable private volatile Queue values; +@Nullable private volatile int count; + +@StartBundle +public void startBundle(StartBundleContext context) { + long id = context.getPipelineOptions().getOptionsId(); + values = globalValues.get(id); + count = globalLimitArguments.get(id); +} + +@ProcessElement +public void processElement(ProcessContext context) { + Object[] input = context.element().getValues().toArray(); + if (values.size() < count) { Review comment: That's a great catch! Yes I agree to use `Collector` here and drop extra values after terminating the pipeline. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113472) Time Spent: 8.5h (was: 8h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key:
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113466 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 20/Jun/18 00:19 Start Date: 20/Jun/18 00:19 Worklog Time Spent: 10m Work Description: apilloud commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196617360 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -133,14 +204,108 @@ private static PipelineResult run( options .getRunner() .getCanonicalName() -.equals("org.apache.beam.runners.direct.DirectRunner")); +.equals("org.apache.beam.runners.direct.DirectRunner"), +"SELECT without INSERT is only supported in DirectRunner in SQL Shell."); + Collector.globalValues.put(id, values); run(options, node, new Collector()); Collector.globalValues.remove(id); return Linq4j.asEnumerable(values); } + private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner"), +"SELECT without INSERT is only supported in DirectRunner in SQL Shell."); + +LimitStateVar limitStateVar = new LimitStateVar(); +int limitCount = getLimitCount(node); + +LimitCanceller.globalLimitArguments.put(id, limitCount); +LimitCanceller.globalStates.put(id, limitStateVar); +LimitCollector.globalLimitArguments.put(id, limitCount); +LimitCollector.globalValues.put(id, values); +limitRun(options, node, new LimitCollector(), new LimitCanceller(), limitStateVar); +LimitCanceller.globalLimitArguments.remove(id); +LimitCanceller.globalStates.remove(id); +LimitCollector.globalLimitArguments.remove(id); +LimitCollector.globalValues.remove(id); + +return Linq4j.asEnumerable(values); + } + + private static class LimitCanceller extends DoFn, Void> { +private static final Map globalLimitArguments = +new ConcurrentHashMap(); +private static final Map globalStates = +new ConcurrentHashMap(); + +@Nullable private volatile Integer count; +@Nullable private volatile LimitStateVar limitStateVar; + +@StateId("counter") +private final StateSpec> counter = StateSpecs.value(VarIntCoder.of()); + +@StartBundle +public void startBundle(StartBundleContext context) { + long id = context.getPipelineOptions().getOptionsId(); + count = globalLimitArguments.get(id); + limitStateVar = globalStates.get(id); +} + +@ProcessElement +public void processElement( +ProcessContext context, @StateId("counter") ValueState counter) { + int current = (counter.read() != null ? counter.read() : 0); + current += 1; + if (current >= count && !limitStateVar.isReached()) { +// if current count reaches the limit count but limitStateVar has not been set, flip +// the var. +limitStateVar.setReached(); + } + + counter.write(current); +} + } + + private static class LimitCollector extends DoFn> { +// This will only work on the direct runner. +private static final Map> globalValues = +new ConcurrentHashMap>(); +private static final Map globalLimitArguments = +new ConcurrentHashMap(); + +@Nullable private volatile Queue values; +@Nullable private volatile int count; + +@StartBundle +public void startBundle(StartBundleContext context) { + long id = context.getPipelineOptions().getOptionsId(); + values = globalValues.get(id); + count = globalLimitArguments.get(id); +} + +@ProcessElement +public void processElement(ProcessContext context) { + Object[] input = context.element().getValues().toArray(); + if (values.size() < count) { Review comment: Now that I think about it, you have a race between checking this count and adding to the queue. The directRunner has a minimum of 3 worker threads so it is possible to hit. I still think the simple, safe, and portable implementation is to use `Collector`, check the size of the queue outside of the pipeline, and truncate after terminating the pipeline. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113448 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:39 Start Date: 19/Jun/18 23:39 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196610533 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -95,13 +107,37 @@ public Result implement(EnumerableRelImplementor implementor, Prefer prefer) { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); if (node instanceof BeamIOSinkRel) { return count(options, node); + } else if (isLimitQuery(node)) { +return limitCollect(options, node); } + return collect(options, node); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } } + enum LimitState { Review comment: Yep. Using enum to have descriptive state than true/false. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113448) Time Spent: 8h 10m (was: 8h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113446=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113446 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:36 Start Date: 19/Jun/18 23:36 Worklog Time Spent: 10m Work Description: apilloud commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196610932 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -125,6 +162,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +// wait every one second to check state of PipelineResult. If returned value +// is null, or return state is non termination state, it means pipeline is still +// running and within loop limit counter state will be checked. +State state; +while (true) { + state = result.waitUntilFinish(Duration.standardSeconds(1)); + if (state != null && state.isTerminal()) { +break; + } + // If state is null, or state is not null and indicates pipeline is not terminated + // yet, check continue checking the limit var. + try { +if (limitStateVar.isReached()) { Review comment: I think we have a very different model as to how this will work in other runners. The runners shouldn't need to be aware of the limit, as that would be applied client side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113446) Time Spent: 8h (was: 7h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113445 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:33 Start Date: 19/Jun/18 23:33 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196610533 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -95,13 +107,37 @@ public Result implement(EnumerableRelImplementor implementor, Prefer prefer) { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); if (node instanceof BeamIOSinkRel) { return count(options, node); + } else if (isLimitQuery(node)) { +return limitCollect(options, node); } + return collect(options, node); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } } + enum LimitState { Review comment: Yep. Using enum to have descriptive state. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113445) Time Spent: 7h 50m (was: 7h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113441 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:32 Start Date: 19/Jun/18 23:32 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196608975 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -125,6 +162,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +// wait every one second to check state of PipelineResult. If returned value +// is null, or return state is non termination state, it means pipeline is still +// running and within loop limit counter state will be checked. +State state; +while (true) { + state = result.waitUntilFinish(Duration.standardSeconds(1)); + if (state != null && state.isTerminal()) { +break; + } + // If state is null, or state is not null and indicates pipeline is not terminated + // yet, check continue checking the limit var. + try { +if (limitStateVar.isReached()) { Review comment: I could replace `LimitCollector` with `Collector` with the cost of removing extra collected values after the pipeline finishes, or I modify `Collector` and add something like `values.size() >= limitCount` to make `Collector` general. To make code more readable, I wrote a dedicated `LimitCollector`. Also, I'd like to separate `count and cancel` and `collect` steps to support unbounded limit. Right now `Collector` seems like only work in DirectRunner but I can make `count and cancel` more general to fit into other runners by using stateful ParDo (that's what LimitCanceller does). So in the future if we start to support unbounded limit in other runners, only `collect` step needs modification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113441) Time Spent: 7h 10m (was: 7h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113443 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:32 Start Date: 19/Jun/18 23:32 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196610384 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -133,14 +219,108 @@ private static PipelineResult run( options .getRunner() .getCanonicalName() -.equals("org.apache.beam.runners.direct.DirectRunner")); +.equals("org.apache.beam.runners.direct.DirectRunner"), +"Only DirectRunner is supported in SQL Shell. " + "Please check your Runner setting."); Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113443) Time Spent: 7.5h (was: 7h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113442 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:32 Start Date: 19/Jun/18 23:32 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196608975 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -125,6 +162,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +// wait every one second to check state of PipelineResult. If returned value +// is null, or return state is non termination state, it means pipeline is still +// running and within loop limit counter state will be checked. +State state; +while (true) { + state = result.waitUntilFinish(Duration.standardSeconds(1)); + if (state != null && state.isTerminal()) { +break; + } + // If state is null, or state is not null and indicates pipeline is not terminated + // yet, check continue checking the limit var. + try { +if (limitStateVar.isReached()) { Review comment: I could replace `LimitCollector` with `Collector` with the cost of removing extra collected values after the pipeline finishes, or I modify `Collector` and add something like `values.size() >= limitCount` to make `Collector` general. To make code more readable, I wrote a dedicated `LimitCollector`. Also, I'd like to separate `count and cancel` and `collect` steps to support unbounded limit. Right now `Collector` seems like only work in DirectRunner but I can make `count and cancel` more general to fit into other runners by using stateful ParDo (that's what `LimitCanceller` does). So in the future if we start to support unbounded limit in other runners, only `collect` step needs modification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113442) Time Spent: 7h 20m (was: 7h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113444=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113444 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:32 Start Date: 19/Jun/18 23:32 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196610388 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -111,6 +147,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( Review comment: fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113444) Time Spent: 7h 40m (was: 7.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 7h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113437 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:29 Start Date: 19/Jun/18 23:29 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196608975 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -125,6 +162,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +// wait every one second to check state of PipelineResult. If returned value +// is null, or return state is non termination state, it means pipeline is still +// running and within loop limit counter state will be checked. +State state; +while (true) { + state = result.waitUntilFinish(Duration.standardSeconds(1)); + if (state != null && state.isTerminal()) { +break; + } + // If state is null, or state is not null and indicates pipeline is not terminated + // yet, check continue checking the limit var. + try { +if (limitStateVar.isReached()) { Review comment: I could replace `LimitCollector` with `Collector` with the cost of removing extra collected values after the pipeline finishes, or I modify `Collector` and add something like `values.size() >= limitCount` to make `Collector` general. To make code more readable, I wrote a dedicated `LimitCollector`. Also, I'd like to separate `count and cancel` and `collect` steps to support unbounded limit. Right now `Collector` seems like only work in DirectRunner but I can make `count and cancel` more general to fit into other runners by using stateful ParDo. So in the future if we start to support other runners, only `collect` step needs modification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113437) Time Spent: 6h 50m (was: 6h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113438=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113438 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:29 Start Date: 19/Jun/18 23:29 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196608975 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -125,6 +162,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +// wait every one second to check state of PipelineResult. If returned value +// is null, or return state is non termination state, it means pipeline is still +// running and within loop limit counter state will be checked. +State state; +while (true) { + state = result.waitUntilFinish(Duration.standardSeconds(1)); + if (state != null && state.isTerminal()) { +break; + } + // If state is null, or state is not null and indicates pipeline is not terminated + // yet, check continue checking the limit var. + try { +if (limitStateVar.isReached()) { Review comment: I could replace `LimitCollector` with `Collector` with the cost of removing extra collected values after the pipeline finishes, or I modify `Collector` and add something like `values.size() >= limitCount` to make `Collector` general. To make code more readable, I wrote a dedicated `LimitCollector`. Also, I'd like to separate `count and cancel` and `collect` steps to support unbounded limit. Right now `Collector` seems like only work in DirectRunner but I can make `count and cancel` more general to fit into other runners by using stateful ParDo. So in the future if we start to support unbounded limit in other runners, only `collect` step needs modification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113438) Time Spent: 7h (was: 6h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 7h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113435 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:28 Start Date: 19/Jun/18 23:28 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196609438 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -111,6 +147,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( Review comment: I want to use this function to encapsulate related logic together. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113435) Time Spent: 6.5h (was: 6h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113436 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:28 Start Date: 19/Jun/18 23:28 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196609486 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -111,6 +147,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( Review comment: To avoid a long `limitCollect` function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113436) Time Spent: 6h 40m (was: 6.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 6h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113434=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113434 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 23:25 Start Date: 19/Jun/18 23:25 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196608975 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -125,6 +162,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +// wait every one second to check state of PipelineResult. If returned value +// is null, or return state is non termination state, it means pipeline is still +// running and within loop limit counter state will be checked. +State state; +while (true) { + state = result.waitUntilFinish(Duration.standardSeconds(1)); + if (state != null && state.isTerminal()) { +break; + } + // If state is null, or state is not null and indicates pipeline is not terminated + // yet, check continue checking the limit var. + try { +if (limitStateVar.isReached()) { Review comment: I could replace `LimitCollector` with `Collector` with the cost of removing extra collected values after the pipeline finishes, or I modify `Collector` and add ``values.size() >= limitCount` to make `Collector` general. To make code more readable, I wrote a dedicated `LimitCollector`. Also, I'd like to separate `count and cancel` and `collect` steps to support unbounded limit. Right now `Collector` seems like only work in DirectRunner but I can make `count and cancel` more general to fit into other runners by using stateful ParDo. So in the future if we start to support other runners, only `collect` step needs modification. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113434) Time Spent: 6h 20m (was: 6h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113418=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113418 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 22:28 Start Date: 19/Jun/18 22:28 Worklog Time Spent: 10m Work Description: apilloud commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196598266 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -125,6 +162,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +// wait every one second to check state of PipelineResult. If returned value +// is null, or return state is non termination state, it means pipeline is still +// running and within loop limit counter state will be checked. +State state; +while (true) { + state = result.waitUntilFinish(Duration.standardSeconds(1)); + if (state != null && state.isTerminal()) { +break; + } + // If state is null, or state is not null and indicates pipeline is not terminated + // yet, check continue checking the limit var. + try { +if (limitStateVar.isReached()) { Review comment: If you did something like `values.size() >= limitCount` here you could replace `LimitCanceller` and `LimitCollector` with `Collector`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113418) Time Spent: 6h 10m (was: 6h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113417=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113417 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 22:28 Start Date: 19/Jun/18 22:28 Worklog Time Spent: 10m Work Description: apilloud commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196595217 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -111,6 +147,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn> collectDoFn, + DoFn, Void> limitCounterDoFn, + LimitStateVar limitStateVar) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); +PCollectionTuple.empty(pipeline) +.apply(node.toPTransform()) +.apply(ParDo.of(collectDoFn)) +.apply(ParDo.of(limitCounterDoFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( Review comment: The extra thread seems unnecessary. You can do this in the main thread. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113417) Time Spent: 6h (was: 5h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113415=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113415 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 22:28 Start Date: 19/Jun/18 22:28 Worklog Time Spent: 10m Work Description: apilloud commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196594705 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -111,6 +147,55 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( Review comment: The point of `run` was to have exactly one for everything. Seeing as that isn't going to work, you should probably just inline this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113415) Time Spent: 5h 40m (was: 5.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113414=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113414 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 22:28 Start Date: 19/Jun/18 22:28 Worklog Time Spent: 10m Work Description: apilloud commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196594393 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -95,13 +107,37 @@ public Result implement(EnumerableRelImplementor implementor, Prefer prefer) { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); if (node instanceof BeamIOSinkRel) { return count(options, node); + } else if (isLimitQuery(node)) { +return limitCollect(options, node); } + return collect(options, node); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } } + enum LimitState { Review comment: nit: Isn't this just a Boolean? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113414) Time Spent: 5.5h (was: 5h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113416=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113416 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 22:28 Start Date: 19/Jun/18 22:28 Worklog Time Spent: 10m Work Description: apilloud commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196595720 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -133,14 +219,108 @@ private static PipelineResult run( options .getRunner() .getCanonicalName() -.equals("org.apache.beam.runners.direct.DirectRunner")); +.equals("org.apache.beam.runners.direct.DirectRunner"), +"Only DirectRunner is supported in SQL Shell. " + "Please check your Runner setting."); Review comment: This isn't exactly true. Should be something like "SELECT without INSERT is only supported in DirectRunner in SQL Shell." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113416) Time Spent: 5h 50m (was: 5h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113412=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113412 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 22:25 Start Date: 19/Jun/18 22:25 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398565693 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113412) Time Spent: 5h 20m (was: 5h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113389 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 21:30 Start Date: 19/Jun/18 21:30 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398552880 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113389) Time Spent: 5h 10m (was: 5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113388=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113388 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 21:30 Start Date: 19/Jun/18 21:30 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398552843 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113388) Time Spent: 5h (was: 4h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113375=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113375 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:57 Start Date: 19/Jun/18 20:57 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398542010 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113375) Time Spent: 4h 50m (was: 4h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113374=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113374 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:56 Start Date: 19/Jun/18 20:56 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398543026 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113374) Time Spent: 4h 40m (was: 4.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113369=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113369 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:53 Start Date: 19/Jun/18 20:53 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398542010 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113369) Time Spent: 4.5h (was: 4h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113368=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113368 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:53 Start Date: 19/Jun/18 20:53 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398538364 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113368) Time Spent: 4h 20m (was: 4h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113364=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113364 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:41 Start Date: 19/Jun/18 20:41 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398538364 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113364) Time Spent: 4h 10m (was: 4h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113361=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113361 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:41 Start Date: 19/Jun/18 20:41 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196570798 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -127,6 +194,66 @@ private static PipelineResult run( return Linq4j.asEnumerable(values); } + private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner")); + +LimitStateWrapper stateWrapper = new LimitStateWrapper(); +LimitCounter.globalValues.put(id, values); +LimitCounter.globalLimitArguments.put(id, getLimitCount(node)); +LimitCounter.globalStates.put(id, stateWrapper); +limitRun(options, node, new LimitCounter(), stateWrapper); +LimitCounter.globalValues.remove(id); +LimitCounter.globalLimitArguments.remove(id); +LimitCounter.globalStates.remove(id); + +return Linq4j.asEnumerable(values); + } + + private static class LimitCounter extends DoFn { +private static final Map globalLimitArguments = Review comment: Use stateful ParDo to implement counter/cancelling now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113361) Time Spent: 3h 40m (was: 3.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113362=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113362 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:41 Start Date: 19/Jun/18 20:41 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196570806 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -127,6 +194,66 @@ private static PipelineResult run( return Linq4j.asEnumerable(values); } + private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner")); Review comment: Added something to say only DirectRunner is supported and please check option setting. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113362) Time Spent: 3h 50m (was: 3h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113363=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113363 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 20:41 Start Date: 19/Jun/18 20:41 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196570820 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -95,13 +101,37 @@ public Result implement(EnumerableRelImplementor implementor, Prefer prefer) { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); if (node instanceof BeamIOSinkRel) { return count(options, node); + } else if (isLimitQuery(node)) { +return limitCollect(options, node); } + return collect(options, node); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } } + enum LimitState { +REACHED, +NOT_REACHED + } + + private static class LimitStateWrapper implements Serializable { Review comment: Rename it to LimitStateVar This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113363) Time Spent: 4h (was: 3h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113308=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113308 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 18:31 Start Date: 19/Jun/18 18:31 Worklog Time Spent: 10m Work Description: amaliujia commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196533522 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -111,6 +141,43 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn doFn, + LimitStateWrapper stateWrapper) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); + PCollectionTuple.empty(pipeline).apply(node.toPTransform()).apply(ParDo.of(doFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +while (!result.getState().isTerminal()) { + try { +Thread.sleep(DaemonThreadSleepIntervalMillis); Review comment: Great suggestion! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113308) Time Spent: 3.5h (was: 3h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113270 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 17:28 Start Date: 19/Jun/18 17:28 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196512057 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -127,6 +194,66 @@ private static PipelineResult run( return Linq4j.asEnumerable(values); } + private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner")); + +LimitStateWrapper stateWrapper = new LimitStateWrapper(); +LimitCounter.globalValues.put(id, values); +LimitCounter.globalLimitArguments.put(id, getLimitCount(node)); +LimitCounter.globalStates.put(id, stateWrapper); +limitRun(options, node, new LimitCounter(), stateWrapper); +LimitCounter.globalValues.remove(id); +LimitCounter.globalLimitArguments.remove(id); +LimitCounter.globalStates.remove(id); + +return Linq4j.asEnumerable(values); + } + + private static class LimitCounter extends DoFn { +private static final Map globalLimitArguments = Review comment: In the `DirectRunner` this is OK. I think you could pretty easily make it a bit more general by using stateful ParDo(DoFn) for counting. Then the only thing left to make it work on a distributed runner is a communication path back to the shell. If you leave it like this, I think the `DoFn` also is not ready, because in a distributed run there will be lots of copies and separate JVMs on different workers, not gathered to one key, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113270) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113272=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113272 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 17:28 Start Date: 19/Jun/18 17:28 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196510662 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -127,6 +194,66 @@ private static PipelineResult run( return Linq4j.asEnumerable(values); } + private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) { +long id = options.getOptionsId(); +Queue values = new ConcurrentLinkedQueue(); + +checkArgument( +options +.getRunner() +.getCanonicalName() +.equals("org.apache.beam.runners.direct.DirectRunner")); Review comment: Can you add a message to this check? That way when it fails it will explain what is going on. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113272) Time Spent: 3h 20m (was: 3h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113271 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 17:28 Start Date: 19/Jun/18 17:28 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196510366 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -95,13 +101,37 @@ public Result implement(EnumerableRelImplementor implementor, Prefer prefer) { Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); if (node instanceof BeamIOSinkRel) { return count(options, node); + } else if (isLimitQuery(node)) { +return limitCollect(options, node); } + return collect(options, node); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } } + enum LimitState { +REACHED, +NOT_REACHED + } + + private static class LimitStateWrapper implements Serializable { Review comment: Why is there a wrapper? Is this to make it clearly something like an `LVar`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113271) Time Spent: 3h 20m (was: 3h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113269 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 17:28 Start Date: 19/Jun/18 17:28 Worklog Time Spent: 10m Work Description: kennknowles commented on a change in pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#discussion_r196509427 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java ## @@ -111,6 +141,43 @@ private static PipelineResult run( return result; } + private static PipelineResult limitRun( + PipelineOptions options, + BeamRelNode node, + DoFn doFn, + LimitStateWrapper stateWrapper) { +ExecutorService pool = Executors.newFixedThreadPool(1); + +options.as(DirectOptions.class).setBlockOnRun(false); +Pipeline pipeline = Pipeline.create(options); + PCollectionTuple.empty(pipeline).apply(node.toPTransform()).apply(ParDo.of(doFn)); + +PipelineResult result = pipeline.run(); + +pool.execute( +new Runnable() { + @Override + public void run() { +while (!result.getState().isTerminal()) { + try { +Thread.sleep(DaemonThreadSleepIntervalMillis); Review comment: I think result.waitUntilFinish(duration) is good here: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java#L58 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113269) Time Spent: 3h 10m (was: 3h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113245=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113245 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:55 Start Date: 19/Jun/18 16:55 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398265245 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113245) Time Spent: 2h 20m (was: 2h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113247=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113247 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:55 Start Date: 19/Jun/18 16:55 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398291401 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113247) Time Spent: 2h 40m (was: 2.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113249=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113249 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:56 Start Date: 19/Jun/18 16:56 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398470292 R: @kennknowles cc: @apilloud @akedin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113249) Time Spent: 3h (was: 2h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113246=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113246 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:55 Start Date: 19/Jun/18 16:55 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398289136 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113246) Time Spent: 2.5h (was: 2h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113248=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113248 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:55 Start Date: 19/Jun/18 16:55 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398309597 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113248) Time Spent: 2h 50m (was: 2h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113244=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113244 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:55 Start Date: 19/Jun/18 16:55 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398263801 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113244) Time Spent: 2h 10m (was: 2h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113243=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113243 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:55 Start Date: 19/Jun/18 16:55 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398263327 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113243) Time Spent: 2h (was: 1h 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113242=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113242 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 16:55 Start Date: 19/Jun/18 16:55 Worklog Time Spent: 10m Work Description: amaliujia removed a comment on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398263214 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113242) Time Spent: 1h 50m (was: 1h 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113086 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 07:59 Start Date: 19/Jun/18 07:59 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398310217 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113086) Time Spent: 1h 40m (was: 1.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113084=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113084 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 07:57 Start Date: 19/Jun/18 07:57 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398309597 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113084) Time Spent: 1.5h (was: 1h 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113043=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113043 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 06:46 Start Date: 19/Jun/18 06:46 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398292372 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113043) Time Spent: 1h 20m (was: 1h 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113038 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 06:41 Start Date: 19/Jun/18 06:41 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398291401 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113038) Time Spent: 1h 10m (was: 1h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113031=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113031 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 06:30 Start Date: 19/Jun/18 06:30 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398289136 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 113031) Time Spent: 1h (was: 50m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112992=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112992 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 03:45 Start Date: 19/Jun/18 03:45 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398265245 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112992) Time Spent: 50m (was: 40m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112989=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112989 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 03:34 Start Date: 19/Jun/18 03:34 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398263801 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112989) Time Spent: 40m (was: 0.5h) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112988 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 03:30 Start Date: 19/Jun/18 03:30 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398263327 run java postcommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112988) Time Spent: 0.5h (was: 20m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112987 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 03:30 Start Date: 19/Jun/18 03:30 Worklog Time Spent: 10m Work Description: amaliujia commented on issue #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682#issuecomment-398263214 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112987) Time Spent: 20m (was: 10m) > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4194) [SQL] Support LIMIT on Unbounded Data
[ https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=112986=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112986 ] ASF GitHub Bot logged work on BEAM-4194: Author: ASF GitHub Bot Created on: 19/Jun/18 03:29 Start Date: 19/Jun/18 03:29 Worklog Time Spent: 10m Work Description: amaliujia opened a new pull request #5682: [BEAM-4194] support unbounded limit URL: https://github.com/apache/beam/pull/5682 # What? Support unbounded limit in Beam SQL shell. In the past, due to default global window and default trigger, queries like "SELECT col_name FROM unbounded_table LIMIT 1" will not return in SQL Shell. This PR tries to support unbounded limit by starting a daemon thread to monitoring return value collection in BeanEnumerableConverter, and stop pipeline when collected values reach limit count. More detailed description can be found here: https://docs.google.com/document/d/13zeTewHH9nfwhSlcE4x77WQwr1U2Z4sTiNRjOXUj2aw/edit?usp=sharing. # Testing - [x] Unit test. Adding two unit tests to mock bounded and unbounded input tables to test LIMIT functionality. - [x] Integration test. Adding one e2e integration test, which utilizes Google Cloud Pub/Sub to test the unbounded limit on auto-generated Pub/Sub messages. - [x] Other test. Manually tested unbounded limit in Beam SQL shell on a Google Cloud Pub/Sub table. See the screenshot: https://user-images.githubusercontent.com/1938382/41574436-106ca752-7336-11e8-91a7-33200513d4b8.png;> Follow this checklist to help us incorporate your contribution quickly and easily: - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112986) Time Spent: 10m Remaining Estimate: 0h > [SQL] Support LIMIT on Unbounded Data > - > > Key: BEAM-4194 > URL: https://issues.apache.org/jira/browse/BEAM-4194 > Project: Beam > Issue Type: New Feature > Components: dsl-sql >Reporter: Anton Kedin >Assignee: Rui Wang >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > We need to support queries with "LIMIT xxx". > Problem is that we don't know when aggregates will trigger, they can > potentially accumulate values in global window and never trigger. > If we have some trigger syntax (BEAM-4193), then the use case becomes similar > to what we have at the moment, where the user defines the trigger upstream > for all inputs. In this case LIMIT probably can be implemented as > sample.any(5) with trigger at count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)