[ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=113466&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113466
 ]

ASF GitHub Bot logged work on BEAM-4194:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Jun/18 00:19
            Start Date: 20/Jun/18 00:19
    Worklog Time Spent: 10m 
      Work Description: apilloud commented on a change in pull request #5682: 
[BEAM-4194] support unbounded limit
URL: https://github.com/apache/beam/pull/5682#discussion_r196617360
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 ##########
 @@ -133,14 +204,108 @@ private static PipelineResult run(
         options
             .getRunner()
             .getCanonicalName()
-            .equals("org.apache.beam.runners.direct.DirectRunner"));
+            .equals("org.apache.beam.runners.direct.DirectRunner"),
+        "SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
     Collector.globalValues.put(id, values);
     run(options, node, new Collector());
     Collector.globalValues.remove(id);
 
     return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable<Object> limitCollect(PipelineOptions options, 
BeamRelNode node) {
+    long id = options.getOptionsId();
+    Queue<Object> values = new ConcurrentLinkedQueue<Object>();
+
+    checkArgument(
+        options
+            .getRunner()
+            .getCanonicalName()
+            .equals("org.apache.beam.runners.direct.DirectRunner"),
+        "SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
+    LimitStateVar limitStateVar = new LimitStateVar();
+    int limitCount = getLimitCount(node);
+
+    LimitCanceller.globalLimitArguments.put(id, limitCount);
+    LimitCanceller.globalStates.put(id, limitStateVar);
+    LimitCollector.globalLimitArguments.put(id, limitCount);
+    LimitCollector.globalValues.put(id, values);
+    limitRun(options, node, new LimitCollector(), new LimitCanceller(), 
limitStateVar);
+    LimitCanceller.globalLimitArguments.remove(id);
+    LimitCanceller.globalStates.remove(id);
+    LimitCollector.globalLimitArguments.remove(id);
+    LimitCollector.globalValues.remove(id);
+
+    return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCanceller extends DoFn<KV<String, Row>, Void> {
+    private static final Map<Long, Integer> globalLimitArguments =
+        new ConcurrentHashMap<Long, Integer>();
+    private static final Map<Long, LimitStateVar> globalStates =
+        new ConcurrentHashMap<Long, LimitStateVar>();
+
+    @Nullable private volatile Integer count;
+    @Nullable private volatile LimitStateVar limitStateVar;
+
+    @StateId("counter")
+    private final StateSpec<ValueState<Integer>> counter = 
StateSpecs.value(VarIntCoder.of());
+
+    @StartBundle
+    public void startBundle(StartBundleContext context) {
+      long id = context.getPipelineOptions().getOptionsId();
+      count = globalLimitArguments.get(id);
+      limitStateVar = globalStates.get(id);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext context, @StateId("counter") ValueState<Integer> 
counter) {
+      int current = (counter.read() != null ? counter.read() : 0);
+      current += 1;
+      if (current >= count && !limitStateVar.isReached()) {
+        // if current count reaches the limit count but limitStateVar has not 
been set, flip
+        // the var.
+        limitStateVar.setReached();
+      }
+
+      counter.write(current);
+    }
+  }
+
+  private static class LimitCollector extends DoFn<Row, KV<String, Row>> {
+    // This will only work on the direct runner.
+    private static final Map<Long, Queue<Object>> globalValues =
+        new ConcurrentHashMap<Long, Queue<Object>>();
+    private static final Map<Long, Integer> globalLimitArguments =
+        new ConcurrentHashMap<Long, Integer>();
+
+    @Nullable private volatile Queue<Object> values;
+    @Nullable private volatile int count;
+
+    @StartBundle
+    public void startBundle(StartBundleContext context) {
+      long id = context.getPipelineOptions().getOptionsId();
+      values = globalValues.get(id);
+      count = globalLimitArguments.get(id);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      Object[] input = context.element().getValues().toArray();
+      if (values.size() < count) {
 
 Review comment:
   Now that I think about it, you have a race between checking this count and 
adding to the queue. The directRunner has a minimum of 3 worker threads so it 
is possible to hit. I still think the simple, safe, and portable implementation 
is to use `Collector`, check the size of the queue outside of the pipeline, and 
truncate after terminating the pipeline.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 113466)
    Time Spent: 8h 20m  (was: 8h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -------------------------------------
>
>                 Key: BEAM-4194
>                 URL: https://issues.apache.org/jira/browse/BEAM-4194
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to