[ 
https://issues.apache.org/jira/browse/BEAM-4194?focusedWorklogId=114535&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-114535
 ]

ASF GitHub Bot logged work on BEAM-4194:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jun/18 22:32
            Start Date: 21/Jun/18 22:32
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5682: [BEAM-4194] 
support unbounded limit
URL: https://github.com/apache/beam/pull/5682
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index cbbafc72a93..eb7d9d2f109 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -80,6 +80,18 @@ public Calc copy(RelTraitSet traitSet, RelNode input, 
RexProgram program) {
     }
   }
 
+  public int getLimitCountOfSortRel() {
+    if (input instanceof BeamSortRel) {
+      return ((BeamSortRel) input).getCount();
+    }
+
+    throw new RuntimeException("Could not get the limit count from a non 
BeamSortRel input.");
+  }
+
+  public boolean isInputSortRelAndLimitOnly() {
+    return (input instanceof BeamSortRel) && ((BeamSortRel) 
input).isLimitOnly();
+  }
+
   /** {@code CalcFn} is the executor for a {@link BeamCalcRel} step. */
   public static class CalcFn extends DoFn<Row, Row> {
     private BeamSqlExpressionExecutor executor;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 2fbcb5b1e72..8e32a6aa2de 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -19,14 +19,19 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.direct.DirectOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
@@ -35,8 +40,12 @@
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.adapter.enumerable.EnumerableRel;
 import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
@@ -56,9 +65,13 @@
 import org.apache.calcite.rel.convert.ConverterImpl;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** BeamRelNode to replace a {@code Enumerable} node. */
 public class BeamEnumerableConverter extends ConverterImpl implements 
EnumerableRel {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamEnumerableConverter.class);
 
   public BeamEnumerableConverter(RelOptCluster cluster, RelTraitSet traits, 
RelNode input) {
     super(cluster, ConventionTraitDef.INSTANCE, traits, input);
@@ -108,13 +121,37 @@ public static PipelineOptions 
createPipelineOptions(Map<String, String> map) {
       
Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
       if (node instanceof BeamIOSinkRel) {
         return count(options, node);
+      } else if (isLimitQuery(node)) {
+        return limitCollect(options, node);
       }
+
       return collect(options, node);
     } finally {
       Thread.currentThread().setContextClassLoader(originalClassLoader);
     }
   }
 
+  enum LimitState {
+    REACHED,
+    NOT_REACHED
+  }
+
+  private static class LimitStateVar implements Serializable {
+    private LimitState state;
+
+    public LimitStateVar() {
+      state = LimitState.NOT_REACHED;
+    }
+
+    public void setReached() {
+      state = LimitState.REACHED;
+    }
+
+    public boolean isReached() {
+      return state == LimitState.REACHED;
+    }
+  }
+
   private static PipelineResult run(
       PipelineOptions options, BeamRelNode node, DoFn<Row, Void> doFn) {
     Pipeline pipeline = Pipeline.create(options);
@@ -124,6 +161,42 @@ private static PipelineResult run(
     return result;
   }
 
+  private static PipelineResult limitRun(
+      PipelineOptions options,
+      BeamRelNode node,
+      DoFn<Row, KV<String, Row>> collectDoFn,
+      DoFn<KV<String, Row>, Void> limitCounterDoFn,
+      LimitStateVar limitStateVar) {
+    options.as(DirectOptions.class).setBlockOnRun(false);
+    Pipeline pipeline = Pipeline.create(options);
+    BeamSqlRelUtils.toPCollection(pipeline, node)
+        .apply(ParDo.of(collectDoFn))
+        .apply(ParDo.of(limitCounterDoFn));
+
+    PipelineResult result = pipeline.run();
+
+    State state;
+    while (true) {
+      // Check pipeline state in every second
+      state = result.waitUntilFinish(Duration.standardSeconds(1));
+      if (state != null && state.isTerminal()) {
+        break;
+      }
+      // If state is null, or state is not null and indicates pipeline is not 
terminated
+      // yet, check continue checking the limit var.
+      try {
+        if (limitStateVar.isReached()) {
+          result.cancel();
+          break;
+        }
+      } catch (IOException e) {
+        LOG.warn(e.toString());
+        break;
+      }
+    }
+    return result;
+  }
+
   private static Enumerable<Object> collect(PipelineOptions options, 
BeamRelNode node) {
     long id = options.getOptionsId();
     Queue<Object> values = new ConcurrentLinkedQueue<Object>();
@@ -132,7 +205,9 @@ private static PipelineResult run(
         options
             .getRunner()
             .getCanonicalName()
-            .equals("org.apache.beam.runners.direct.DirectRunner"));
+            .equals("org.apache.beam.runners.direct.DirectRunner"),
+        "SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
     Collector.globalValues.put(id, values);
     run(options, node, new Collector());
     Collector.globalValues.remove(id);
@@ -140,6 +215,90 @@ private static PipelineResult run(
     return Linq4j.asEnumerable(values);
   }
 
+  private static Enumerable<Object> limitCollect(PipelineOptions options, 
BeamRelNode node) {
+    long id = options.getOptionsId();
+    Queue<Object> values = new ConcurrentLinkedQueue<Object>();
+
+    checkArgument(
+        options
+            .getRunner()
+            .getCanonicalName()
+            .equals("org.apache.beam.runners.direct.DirectRunner"),
+        "SELECT without INSERT is only supported in DirectRunner in SQL 
Shell.");
+
+    LimitStateVar limitStateVar = new LimitStateVar();
+    int limitCount = getLimitCount(node);
+
+    LimitCanceller.globalLimitArguments.put(id, limitCount);
+    LimitCanceller.globalStates.put(id, limitStateVar);
+    LimitCollector.globalValues.put(id, values);
+    limitRun(options, node, new LimitCollector(), new LimitCanceller(), 
limitStateVar);
+    LimitCanceller.globalLimitArguments.remove(id);
+    LimitCanceller.globalStates.remove(id);
+    LimitCollector.globalValues.remove(id);
+
+    return Linq4j.asEnumerable(values);
+  }
+
+  private static class LimitCanceller extends DoFn<KV<String, Row>, Void> {
+    private static final Map<Long, Integer> globalLimitArguments =
+        new ConcurrentHashMap<Long, Integer>();
+    private static final Map<Long, LimitStateVar> globalStates =
+        new ConcurrentHashMap<Long, LimitStateVar>();
+
+    @Nullable private volatile Integer count;
+    @Nullable private volatile LimitStateVar limitStateVar;
+
+    @StateId("counter")
+    private final StateSpec<ValueState<Integer>> counter = 
StateSpecs.value(VarIntCoder.of());
+
+    @StartBundle
+    public void startBundle(StartBundleContext context) {
+      long id = context.getPipelineOptions().getOptionsId();
+      count = globalLimitArguments.get(id);
+      limitStateVar = globalStates.get(id);
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext context, @StateId("counter") ValueState<Integer> 
counter) {
+      int current = (counter.read() != null ? counter.read() : 0);
+      current += 1;
+      if (current >= count && !limitStateVar.isReached()) {
+        // if current count reaches the limit count but limitStateVar has not 
been set, flip
+        // the var.
+        limitStateVar.setReached();
+      }
+
+      counter.write(current);
+    }
+  }
+
+  private static class LimitCollector extends DoFn<Row, KV<String, Row>> {
+    // This will only work on the direct runner.
+    private static final Map<Long, Queue<Object>> globalValues =
+        new ConcurrentHashMap<Long, Queue<Object>>();
+
+    @Nullable private volatile Queue<Object> values;
+
+    @StartBundle
+    public void startBundle(StartBundleContext context) {
+      long id = context.getPipelineOptions().getOptionsId();
+      values = globalValues.get(id);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      Object[] input = context.element().getValues().toArray();
+      if (input.length == 1) {
+        values.add(input[0]);
+      } else {
+        values.add(input);
+      }
+      context.output(KV.of("DummyKey", context.element()));
+    }
+  }
+
   private static class Collector extends DoFn<Row, Void> {
     // This will only work on the direct runner.
     private static final Map<Long, Queue<Object>> globalValues =
@@ -185,4 +344,20 @@ public void processElement(ProcessContext context) {
       rows.inc();
     }
   }
+
+  private static boolean isLimitQuery(BeamRelNode node) {
+    return (node instanceof BeamSortRel && ((BeamSortRel) node).isLimitOnly())
+        || (node instanceof BeamCalcRel && ((BeamCalcRel) 
node).isInputSortRelAndLimitOnly());
+  }
+
+  private static int getLimitCount(BeamRelNode node) {
+    if (node instanceof BeamSortRel) {
+      return ((BeamSortRel) node).getCount();
+    } else if (node instanceof BeamCalcRel) {
+      return ((BeamCalcRel) node).getLimitCountOfSortRel();
+    }
+
+    throw new RuntimeException(
+        "Cannot get limit count from RelNode tree with root " + 
node.getRelTypeName());
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index 23618beae11..eda0b8a018b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -26,15 +26,24 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.Row;
@@ -122,6 +131,14 @@ public BeamSortRel(
     }
   }
 
+  public boolean isLimitOnly() {
+    return fieldIndices.size() == 0;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
   @Override
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
     return new Transform();
@@ -146,27 +163,70 @@ public BeamSortRel(
 
       BeamSqlRowComparator comparator =
           new BeamSqlRowComparator(fieldIndices, orientation, nullsFirst);
-      // first find the top (offset + count)
-      PCollection<List<Row>> rawStream =
-          upstream
-              .apply(
-                  "extractTopOffsetAndFetch",
-                  Top.of(startIndex + count, comparator).withoutDefaults())
-              .setCoder(ListCoder.of(upstream.getCoder()));
-
-      // strip the `leading offset`
-      if (startIndex > 0) {
-        rawStream =
-            rawStream
+
+      PCollection<Row> retStream;
+
+      // There is a need to separate ORDER BY LIMIT and LIMIT, because 
GroupByKey is not allowed
+      // on unbounded data in global window(Top transform uses GroupByKey 
internally).
+      // If it is ORDER BY LIMIT.
+      if (fieldIndices.size() > 0) {
+        // first find the top (offset + count)
+        PCollection<List<Row>> rawStream =
+            upstream
                 .apply(
-                    "stripLeadingOffset", ParDo.of(new SubListFn<>(startIndex, 
startIndex + count)))
+                    "extractTopOffsetAndFetch",
+                    Top.of(startIndex + count, comparator).withoutDefaults())
                 .setCoder(ListCoder.of(upstream.getCoder()));
+
+        // strip the `leading offset`
+        if (startIndex > 0) {
+          rawStream =
+              rawStream
+                  .apply(
+                      "stripLeadingOffset",
+                      ParDo.of(new SubListFn<>(startIndex, startIndex + 
count)))
+                  .setCoder(ListCoder.of(upstream.getCoder()));
+        }
+
+        retStream = rawStream.apply("flatten", Flatten.iterables());
+      } else { // If it is LIMIT only
+        retStream = upstream.apply(new BeamSqlLimitTransforms<Row>());
       }
 
-      PCollection<Row> orderedStream = rawStream.apply("flatten", 
Flatten.iterables());
-      
orderedStream.setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
+      
retStream.setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
+      return retStream;
+    }
+  }
+
+  private class BeamSqlLimitTransforms<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      Coder<T> coder = input.getCoder();
+      PCollection<KV<String, T>> keyedRow =
+          
input.apply(WithKeys.of("DummyKey")).setCoder(KvCoder.of(StringUtf8Coder.of(), 
coder));
+
+      return keyedRow.apply(ParDo.of(new LimitFn<T>(getCount())));
+    }
+  }
+
+  private static class LimitFn<T> extends DoFn<KV<String, T>, T> {
+    private final Integer limitCount;
 
-      return orderedStream;
+    public LimitFn(int c) {
+      limitCount = c;
+    }
+
+    @StateId("counter")
+    private final StateSpec<ValueState<Integer>> counterState = 
StateSpecs.value(VarIntCoder.of());
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext context, @StateId("counter") ValueState<Integer> 
counterState) {
+      int current = (counterState.read() != null ? counterState.read() : 0);
+      if (current < limitCount) {
+        context.output(context.element().getValue());
+        counterState.write(current + 1);
+      }
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 8f458d8e49f..508f1a11c14 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -37,9 +37,12 @@
 import org.apache.beam.sdk.extensions.sql.impl.parser.TestTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
+import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.jdbc.CalciteConnection;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -47,6 +50,7 @@
 
 /** Test for {@link JdbcDriver}. */
 public class JdbcDriverTest {
+  public static final DateTime FIRST_DATE = new DateTime(1);
 
   private static final Schema BASIC_SCHEMA =
       Schema.builder()
@@ -222,6 +226,73 @@ public void testInternalConnect_boundedTable() throws 
Exception {
     assertFalse(resultSet.next());
   }
 
+  @Test
+  public void testInternalConnect_bounded_limit() throws Exception {
+    ReadOnlyTableProvider tableProvider =
+        new ReadOnlyTableProvider(
+            "test",
+            ImmutableMap.of(
+                "test",
+                MockedBoundedTable.of(
+                        Schema.FieldType.INT32, "id",
+                        Schema.FieldType.STRING, "name")
+                    .addRows(1, "first")
+                    .addRows(1, "second first")
+                    .addRows(2, "second")));
+
+    CalciteConnection connection = JdbcDriver.connect(tableProvider);
+    Statement statement = connection.createStatement();
+    ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 
5");
+    assertTrue(resultSet1.next());
+    assertTrue(resultSet1.next());
+    assertTrue(resultSet1.next());
+    assertFalse(resultSet1.next());
+    assertFalse(resultSet1.next());
+
+    ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 
1");
+    assertTrue(resultSet2.next());
+    assertFalse(resultSet2.next());
+
+    ResultSet resultSet3 = statement.executeQuery("SELECT * FROM test LIMIT 
2");
+    assertTrue(resultSet3.next());
+    assertTrue(resultSet3.next());
+    assertFalse(resultSet3.next());
+
+    ResultSet resultSet4 = statement.executeQuery("SELECT * FROM test LIMIT 
3");
+    assertTrue(resultSet4.next());
+    assertTrue(resultSet4.next());
+    assertTrue(resultSet4.next());
+    assertFalse(resultSet4.next());
+  }
+
+  @Test
+  public void testInternalConnect_unbounded_limit() throws Exception {
+    ReadOnlyTableProvider tableProvider =
+        new ReadOnlyTableProvider(
+            "test",
+            ImmutableMap.of(
+                "test",
+                MockedUnboundedTable.of(
+                        Schema.FieldType.INT32, "order_id",
+                        Schema.FieldType.INT32, "site_id",
+                        Schema.FieldType.INT32, "price",
+                        Schema.FieldType.DATETIME, "order_time")
+                    .timestampColumnIndex(3)
+                    .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, 
FIRST_DATE)));
+
+    CalciteConnection connection = JdbcDriver.connect(tableProvider);
+    Statement statement = connection.createStatement();
+
+    ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 
1");
+    assertTrue(resultSet1.next());
+    assertFalse(resultSet1.next());
+
+    ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 
2");
+    assertTrue(resultSet2.next());
+    assertTrue(resultSet2.next());
+    assertFalse(resultSet2.next());
+  }
+
   private List<List<Object>> readResultSet(ResultSet result) throws Exception {
     List<List<Object>> results = new ArrayList<>();
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index e9dbff8bcca..b028b9c4fdf 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -18,16 +18,29 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
 import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -37,16 +50,20 @@
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.jdbc.CalciteConnection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Integration tests for querying Pubsub JSON messages with SQL. */
 @RunWith(JUnit4.class)
 public class PubsubJsonIT implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PubsubJsonIT.class);
 
   private static final Schema PAYLOAD_SCHEMA =
       Schema.builder()
@@ -54,6 +71,11 @@
           .addNullableField("name", Schema.FieldType.STRING)
           .build();
 
+  private static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
+  private static final String BEAM_CALCITE_SCHEMA = "beamCalciteSchema";
+  private static final JdbcDriver INSTANCE = new JdbcDriver();
+  private static volatile Boolean checked = false;
+
   @Rule public transient TestPubsub eventsTopic = TestPubsub.create();
   @Rule public transient TestPubsub dlqTopic = TestPubsub.create();
   @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
@@ -140,7 +162,6 @@ public void testUsesDlq() throws Exception {
 
     sqlEnv.executeDdl(createTableString);
     query(sqlEnv, pipeline, queryString);
-
     PCollection<PubsubMessage> dlq =
         
pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(dlqTopic.topicPath()));
 
@@ -152,10 +173,93 @@ public void testUsesDlq() throws Exception {
                 containsAll(dlqMessages, message(ts(4), "{ - }"), 
message(ts(5), "{ + }"))));
 
     pipeline.run();
+
     eventsTopic.publish(messages);
     signal.waitForSuccess(Duration.standardMinutes(5));
   }
 
+  @Test
+  public void testSQLLimit() throws SQLException, IOException, 
InterruptedException {
+    String createTableString =
+        "CREATE TABLE message (\n"
+            + "event_timestamp TIMESTAMP, \n"
+            + "attributes MAP<VARCHAR, VARCHAR>, \n"
+            + "payload ROW< \n"
+            + "             id INTEGER, \n"
+            + "             name VARCHAR \n"
+            + "           > \n"
+            + ") \n"
+            + "TYPE 'pubsub' \n"
+            + "LOCATION '"
+            + eventsTopic.topicPath()
+            + "' \n"
+            + "TBLPROPERTIES "
+            + "    '{ "
+            + "       \"timestampAttributeKey\" : \"ts\", "
+            + "       \"deadLetterQueue\" : \""
+            + dlqTopic.topicPath()
+            + "\""
+            + "     }'";
+
+    List<PubsubMessage> messages =
+        ImmutableList.of(
+            message(ts(1), 3, "foo"),
+            message(ts(2), 5, "bar"),
+            message(ts(3), 7, "baz"),
+            message(ts(4), 9, "ba2"),
+            message(ts(5), 10, "ba3"),
+            message(ts(6), 13, "ba4"),
+            message(ts(7), 15, "ba5"));
+
+    CalciteConnection connection = connect(new PubsubJsonTableProvider());
+
+    Statement statement = connection.createStatement();
+    statement.execute(createTableString);
+
+    // Because Pubsub only allow new subscription receives message after the 
subscription is
+    // created, eventsTopic.publish(messages) can only be called after 
statement.executeQuery.
+    // However, because statement.executeQuery is a blocking call, it has to 
be put into a
+    // seperate thread to execute.
+    ExecutorService pool = Executors.newFixedThreadPool(1);
+    pool.execute(
+        new Runnable() {
+          @Override
+          public void run() {
+            try {
+              ResultSet resultSet =
+                  statement.executeQuery("SELECT message.payload.id FROM 
message LIMIT 3");
+              assertTrue(resultSet.next());
+              assertTrue(resultSet.next());
+              assertTrue(resultSet.next());
+              assertFalse(resultSet.next());
+              checked = true;
+            } catch (SQLException e) {
+              LOG.warn(e.toString());
+            }
+          }
+        });
+
+    // wait one minute to allow subscription creation.
+    Thread.sleep(60 * 1000);
+    eventsTopic.publish(messages);
+    // Wait one minute to allow the thread finishes checks.
+    Thread.sleep(60 * 1000);
+    // verify if the thread has checked returned value from LIMIT query.
+    assertTrue(checked);
+    pool.shutdown();
+  }
+
+  private CalciteConnection connect(TableProvider... tableProviders) throws 
SQLException {
+    InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
+    for (TableProvider tableProvider : tableProviders) {
+      inMemoryMetaStore.registerProvider(tableProvider);
+    }
+
+    Properties info = new Properties();
+    info.put(BEAM_CALCITE_SCHEMA, new BeamCalciteSchema(inMemoryMetaStore));
+    return (CalciteConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, info);
+  }
+
   private static Boolean containsAll(Set<PubsubMessage> set, PubsubMessage... 
subsetCandidate) {
     return Arrays.stream(subsetCandidate)
         .allMatch(candidate -> set.stream().anyMatch(element -> 
messagesEqual(element, candidate)));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 114535)
    Time Spent: 10h 20m  (was: 10h 10m)

> [SQL] Support LIMIT on Unbounded Data
> -------------------------------------
>
>                 Key: BEAM-4194
>                 URL: https://issues.apache.org/jira/browse/BEAM-4194
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> We need to support queries with "LIMIT xxx".
> Problem is that we don't know when aggregates will trigger, they can 
> potentially accumulate values in global window and never trigger.
> If we have some trigger syntax (BEAM-4193), then the use case becomes similar 
> to what we have at the moment, where the user defines the trigger upstream 
> for all inputs. In this case LIMIT probably can be implemented as 
> sample.any(5) with trigger at count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to