This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 96d8e68  [BEAM-6114] Handle Unsupported Lookup Joins
     new 84d5c87  Merge pull request #9453 from 
rahul8383/handle-unsupported-lookupjoins
96d8e68 is described below

commit 96d8e68de882d49cdaa39bd0e598d08dcc15e043
Author: rahul8383 <rahulpatwari8...@gmail.com>
AuthorDate: Thu Aug 29 23:42:52 2019 +0530

    [BEAM-6114] Handle Unsupported Lookup Joins
---
 .../extensions/sql/impl/rel/BeamCoGBKJoinRel.java  |  20 ++
 .../sdk/extensions/sql/impl/rel/BeamJoinRel.java   |  57 ++--
 .../sql/impl/rel/BeamSideInputJoinRel.java         |  24 ++
 .../sql/impl/rel/BeamSideInputLookupJoinRel.java   |  41 ++-
 .../sql/meta/provider/test/TestTableUtils.java     |  14 +
 .../apache/beam/sdk/extensions/sql/TestUtils.java  |   8 +
 .../rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java  |  50 ----
 .../BeamCoGBKJoinRelUnboundedVsUnboundedTest.java  |   2 +-
 ...ndedTest.java => BeamSideInputJoinRelTest.java} | 115 ++------
 .../impl/rel/BeamSideInputLookupJoinRelTest.java   | 295 +++++++++++++++++++++
 10 files changed, 446 insertions(+), 180 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
index 0e165b6..d6ac71b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java
@@ -45,6 +45,26 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
 
+/**
+ * A {@code BeamJoinRel} which does CoGBK Join
+ *
+ * <p>This Join Covers the cases:
+ *
+ * <ul>
+ *   <li>BoundedTable JOIN BoundedTable
+ *   <li>UnboundedTable JOIN UnboundedTable
+ * </ul>
+ *
+ * <p>A CoGBK join is utilized as long as the windowFn of the both sides 
match. For more info refer
+ * <a href="https://issues.apache.org/jira/browse/BEAM-3345";>BEAM-3345</a>
+ *
+ * <p>General constraints:
+ *
+ * <ul>
+ *   <li>Only equi-join is supported.
+ *   <li>CROSS JOIN is not supported.
+ * </ul>
+ */
 public class BeamCoGBKJoinRel extends BeamJoinRel {
 
   public BeamCoGBKJoinRel(
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index f2739e4..422242f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -62,33 +62,15 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.Pair;
 
 /**
- * {@code BeamRelNode} to replace a {@code Join} node.
+ * An abstract {@code BeamRelNode} to implement Join Rels.
  *
- * <p>Support for join can be categorized into 3 cases:
+ * <p>Support for join can be categorized into 4 cases:
  *
  * <ul>
  *   <li>BoundedTable JOIN BoundedTable
  *   <li>UnboundedTable JOIN UnboundedTable
  *   <li>BoundedTable JOIN UnboundedTable
- * </ul>
- *
- * <p>For the first two cases, a standard join is utilized as long as the 
windowFn of the both sides
- * match.
- *
- * <p>For the third case, {@code sideInput} is utilized to implement the join, 
so there are some
- * constraints:
- *
- * <ul>
- *   <li>{@code FULL OUTER JOIN} is not supported.
- *   <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the 
left side.
- *   <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the 
right side.
- * </ul>
- *
- * <p>There are also some general constraints:
- *
- * <ul>
- *   <li>Only equi-join is supported.
- *   <li>CROSS JOIN is not supported.
+ *   <li>SeekableTable JOIN non SeekableTable
  * </ul>
  */
 public abstract class BeamJoinRel extends Join implements BeamRelNode {
@@ -374,11 +356,22 @@ public abstract class BeamJoinRel extends Join implements 
BeamRelNode {
     throw new UnsupportedOperationException("Cannot get column index from " + 
rexNode.getType());
   }
 
-  // The Volcano planner works in a top-down fashion. It starts by transforming
-  // the root and move towards the leafs of the plan. Due to this when
-  // transforming a logical join its inputs are still in the logical 
convention.
-  // So, Recursively visit the inputs of the RelNode till BeamIOSourceRel is 
encountered and
-  // propagate the boundedness upwards.
+  /**
+   * This method returns the Boundedness of a RelNode. It is used during 
planning and applying
+   * {@link org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule} 
and {@link
+   * org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule}
+   *
+   * <p>The Volcano planner works in a top-down fashion. It starts by 
transforming the root and move
+   * towards the leafs of the plan. Due to this when transforming a logical 
join its inputs are
+   * still in the logical convention. So, Recursively visit the inputs of the 
RelNode till
+   * BeamIOSourceRel is encountered and propagate the boundedness upwards.
+   *
+   * <p>The Boundedness of each child of a RelNode is stored in a list. If any 
of the children are
+   * Unbounded, the RelNode is Unbounded. Else, the RelNode is Bounded.
+   *
+   * @param relNode the RelNode whose Boundedness has to be determined
+   * @return {@code PCollection.isBounded}
+   */
   public static PCollection.IsBounded getBoundednessOfRelNode(RelNode relNode) 
{
     if (relNode instanceof BeamRelNode) {
       return (((BeamRelNode) relNode).isBounded());
@@ -387,8 +380,7 @@ public abstract class BeamJoinRel extends Join implements 
BeamRelNode {
     for (RelNode inputRel : relNode.getInputs()) {
       if (inputRel instanceof RelSubset) {
         // Consider the RelNode with best cost in the RelSubset. If best cost 
RelNode cannot be
-        // determined, consider the first RelNode in the RelSubset(Is there a 
better way to do
-        // this?)
+        // determined, consider the first RelNode in the RelSubset
         RelNode rel = ((RelSubset) inputRel).getBest();
         if (rel == null) {
           rel = ((RelSubset) inputRel).getRelList().get(0);
@@ -404,6 +396,15 @@ public abstract class BeamJoinRel extends Join implements 
BeamRelNode {
         : PCollection.IsBounded.BOUNDED);
   }
 
+  /**
+   * This method returns whether any of the children of the relNode are 
Seekable. It is used during
+   * planning and applying {@link 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule}
+   * and {@link 
org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule} and {@link
+   * org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputLookupJoinRule}
+   *
+   * @param relNode the relNode whose children can be Seekable
+   * @return A boolean
+   */
   public static boolean containsSeekableInput(RelNode relNode) {
     for (RelNode relInput : relNode.getInputs()) {
       if (relInput instanceof RelSubset) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
index 3e678e6..7366dcd 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java
@@ -39,6 +39,30 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
 
+/**
+ * A {@code BeamJoinRel} which does sideinput Join
+ *
+ * <p>This Join Covers the case:
+ *
+ * <ul>
+ *   <li>BoundedTable JOIN UnboundedTable
+ * </ul>
+ *
+ * <p>{@code sideInput} is utilized to implement the join, so there are some 
constraints:
+ *
+ * <ul>
+ *   <li>{@code FULL OUTER JOIN} is not supported.
+ *   <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the 
left side.
+ *   <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the 
right side.
+ * </ul>
+ *
+ * <p>General constraints:
+ *
+ * <ul>
+ *   <li>Only equi-join is supported.
+ *   <li>CROSS JOIN is not supported.
+ * </ul>
+ */
 public class BeamSideInputJoinRel extends BeamJoinRel {
 
   public BeamSideInputJoinRel(
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
index 58393c2..27ecae6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java
@@ -34,6 +34,30 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
 
+/**
+ * A {@code BeamJoinRel} which does Lookup Join
+ *
+ * <p>This Join Covers the case:
+ *
+ * <ul>
+ *   <li>SeekableTable JOIN non SeekableTable
+ * </ul>
+ *
+ * <p>As Join is implemented as lookup, there are some constraints:
+ *
+ * <ul>
+ *   <li>{@code FULL OUTER JOIN} is not supported.
+ *   <li>If it's a {@code LEFT OUTER JOIN}, the non Seekable table should on 
the left side.
+ *   <li>If it's a {@code RIGHT OUTER JOIN}, the non Seekable table should on 
the right side.
+ * </ul>
+ *
+ * <p>General constraints:
+ *
+ * <ul>
+ *   <li>Only equi-join is supported.
+ *   <li>CROSS JOIN is not supported.
+ * </ul>
+ */
 public class BeamSideInputLookupJoinRel extends BeamJoinRel {
 
   public BeamSideInputLookupJoinRel(
@@ -49,7 +73,22 @@ public class BeamSideInputLookupJoinRel extends BeamJoinRel {
 
   @Override
   public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
-    // Should we throw Exception when joinType is LEFT (or) RIGHT (or) FULL?
+    // if one of the sides is Seekable & the other is non Seekable
+    // then do a sideInputLookup join.
+    // When doing a sideInputLookup join, the windowFn does not need to match.
+    // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join 
must be
+    // non Seekable & RIGHT OUTER JOIN where right side of the join must be 
non Seekable
+    if (joinType == JoinRelType.FULL) {
+      throw new UnsupportedOperationException(
+          "FULL OUTER JOIN is not supported when join "
+              + "a Seekable table with a non Seekable table.");
+    }
+
+    if ((joinType == JoinRelType.LEFT && seekableInputIndex().get() == 0)
+        || (joinType == JoinRelType.RIGHT && seekableInputIndex().get() == 1)) 
{
+      throw new UnsupportedOperationException(
+          String.format("%s side of an OUTER JOIN must be a non Seekable 
table.", joinType.name()));
+    }
     return new SideInputLookupJoin();
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
index 1fd44ff..777209a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java
@@ -55,11 +55,25 @@ public class TestTableUtils {
         .collect(toSchema());
   }
 
+  public static Schema buildBeamSqlNullableSchema(Object... args) {
+    return Stream.iterate(0, i -> i + 3)
+        .limit(args.length / 3)
+        .map(i -> toNullableRecordField(args, i))
+        .collect(toSchema());
+  }
+
   // TODO: support nested.
   public static Schema.Field toRecordField(Object[] args, int i) {
     return Schema.Field.of((String) args[i + 1], (FieldType) args[i]);
   }
 
+  public static Schema.Field toNullableRecordField(Object[] args, int i) {
+    if ((boolean) args[i + 2]) {
+      return Schema.Field.nullable((String) args[i + 1], (FieldType) args[i]);
+    }
+    return Schema.Field.of((String) args[i + 1], (FieldType) args[i]);
+  }
+
   /**
    * Convenient way to build a {@code BeamSqlRow}s.
    *
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 7e14380..b7d6791 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -104,6 +104,14 @@ public class TestUtils {
       return builder;
     }
 
+    public static RowsBuilder ofNullable(final Object... args) {
+      Schema beamSQLSchema = TestTableUtils.buildBeamSqlNullableSchema(args);
+      RowsBuilder builder = new RowsBuilder();
+      builder.type = beamSQLSchema;
+
+      return builder;
+    }
+
     /**
      * Create a RowsBuilder with the specified row type info.
      *
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
index ca65e31..f572b67 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
@@ -17,15 +17,12 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
-import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rel.RelNode;
@@ -55,17 +52,10 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
               Schema.FieldType.INT32, "price")
           .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
 
-  public static final BeamSqlTable SITE_LKP =
-      new BeamSideInputJoinRelUnboundedVsBoundedTest.SiteLookupTable(
-          TestTableUtils.buildBeamSqlSchema(
-              Schema.FieldType.INT32, "order_id",
-              Schema.FieldType.STRING, "site_name"));
-
   @BeforeClass
   public static void prepare() {
     registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
     registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
-    registerTable("SITE_LKP", SITE_LKP);
   }
 
   @Test
@@ -385,44 +375,4 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
     compilePipeline(sql, pipeline);
     pipeline.run();
   }
-
-  @Test
-  public void testBoundedVsLookupTableJoin() throws Exception {
-    String sql =
-        "SELECT o1.order_id, o2.site_name FROM "
-            + " ORDER_DETAILS1 o1 "
-            + " JOIN SITE_LKP o2 "
-            + " on "
-            + " o1.order_id=o2.order_id "
-            + " WHERE o1.order_id=1";
-    PCollection<Row> rows = compilePipeline(sql, pipeline);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                    Schema.FieldType.INT32, "order_id",
-                    Schema.FieldType.STRING, "site_name")
-                .addRows(1, "SITE1")
-                .getStringRows());
-    pipeline.run();
-  }
-
-  @Test
-  public void testLookupTableVsBoundedJoin() throws Exception {
-    String sql =
-        "SELECT o1.order_id, o2.site_name FROM "
-            + " SITE_LKP o2 "
-            + " JOIN ORDER_DETAILS1 o1 "
-            + " on "
-            + " o1.order_id=o2.order_id "
-            + " WHERE o1.order_id=1";
-    PCollection<Row> rows = compilePipeline(sql, pipeline);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                    Schema.FieldType.INT32, "order_id",
-                    Schema.FieldType.STRING, "site_name")
-                .addRows(1, "SITE1")
-                .getStringRows());
-    pipeline.run();
-  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
index f9a5fb4..1f73e85 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java
@@ -36,7 +36,7 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
-/** Unbounded + Unbounded Test for {@code BeamStandardJoinRel}. */
+/** Unbounded + Unbounded Test for {@code BeamCoGBKJoinRel}. */
 public class BeamCoGBKJoinRelUnboundedVsUnboundedTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
   private static final DateTime FIRST_DATE = new DateTime(1);
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelTest.java
similarity index 77%
rename from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
rename to 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelTest.java
index 45881a8..91043c3 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelTest.java
@@ -17,25 +17,16 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
-import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
-import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
-import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.calcite.rel.RelNode;
 import org.joda.time.DateTime;
@@ -45,8 +36,8 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 
-/** Unbounded + Unbounded Test for {@code BeamSideInputJoinRel}. */
-public class BeamSideInputJoinRelUnboundedVsBoundedTest extends BaseRelTest {
+/** Unbounded + Bounded Test for {@code BeamSideInputJoinRel}. */
+public class BeamSideInputJoinRelTest extends BaseRelTest {
   @Rule public final TestPipeline pipeline = TestPipeline.create();
   public static final DateTime FIRST_DATE = new DateTime(1);
   public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000);
@@ -55,6 +46,19 @@ public class BeamSideInputJoinRelUnboundedVsBoundedTest 
extends BaseRelTest {
 
   @BeforeClass
   public static void prepare() {
+    registerUnboundedTable();
+
+    registerTable(
+        "ORDER_DETAILS1",
+        TestBoundedTable.of(
+                Schema.FieldType.INT32, "order_id",
+                Schema.FieldType.STRING, "buyer")
+            .addRows(
+                1, "james",
+                2, "bond"));
+  }
+
+  public static void registerUnboundedTable() {
     registerTable(
         "ORDER_DETAILS",
         TestUnboundedTable.of(
@@ -90,55 +94,6 @@ public class BeamSideInputJoinRelUnboundedVsBoundedTest 
extends BaseRelTest {
                 2,
                 3,
                 SECOND_DATE));
-
-    registerTable(
-        "ORDER_DETAILS1",
-        TestBoundedTable.of(
-                Schema.FieldType.INT32, "order_id",
-                Schema.FieldType.STRING, "buyer")
-            .addRows(
-                1, "james",
-                2, "bond"));
-
-    registerTable(
-        "SITE_LKP",
-        new SiteLookupTable(
-            TestTableUtils.buildBeamSqlSchema(
-                Schema.FieldType.INT32, "site_id",
-                Schema.FieldType.STRING, "site_name")));
-  }
-
-  /** Test table for JOIN-AS-LOOKUP. */
-  public static class SiteLookupTable extends BaseBeamTable implements 
BeamSqlSeekableTable {
-
-    public SiteLookupTable(Schema schema) {
-      super(schema);
-    }
-
-    @Override
-    public PCollection.IsBounded isBounded() {
-      return PCollection.IsBounded.BOUNDED;
-    }
-
-    @Override
-    public PCollection<Row> buildIOReader(PBegin begin) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public POutput buildIOWriter(PCollection<Row> input) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<Row> seekRow(Row lookupSubRow) {
-      return Arrays.asList(Row.withSchema(getSchema()).addValues(1, 
"SITE1").build());
-    }
-
-    @Override
-    public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-      return BeamTableStatistics.BOUNDED_UNKNOWN;
-    }
   }
 
   @Test
@@ -322,44 +277,4 @@ public class BeamSideInputJoinRelUnboundedVsBoundedTest 
extends BaseRelTest {
     compilePipeline(sql, pipeline);
     pipeline.run();
   }
-
-  @Test
-  public void testUnboundedVsLookupTableJoin() throws Exception {
-    String sql =
-        "SELECT o1.order_id, o2.site_name FROM "
-            + " ORDER_DETAILS o1 "
-            + " JOIN SITE_LKP o2 "
-            + " on "
-            + " o1.site_id=o2.site_id "
-            + " WHERE o1.site_id=1";
-    PCollection<Row> rows = compilePipeline(sql, pipeline);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                    Schema.FieldType.INT32, "order_id",
-                    Schema.FieldType.STRING, "site_name")
-                .addRows(1, "SITE1")
-                .getStringRows());
-    pipeline.run();
-  }
-
-  @Test
-  public void testLookupTableVsUnboundedJoin() throws Exception {
-    String sql =
-        "SELECT o1.order_id, o2.site_name FROM "
-            + " SITE_LKP o2 "
-            + " JOIN ORDER_DETAILS o1 "
-            + " on "
-            + " o1.site_id=o2.site_id "
-            + " WHERE o1.site_id=1";
-    PCollection<Row> rows = compilePipeline(sql, pipeline);
-    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
-        .containsInAnyOrder(
-            TestUtils.RowsBuilder.of(
-                    Schema.FieldType.INT32, "order_id",
-                    Schema.FieldType.STRING, "site_name")
-                .addRows(1, "SITE1")
-                .getStringRows());
-    pipeline.run();
-  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java
new file mode 100644
index 0000000..8b4f51a
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static 
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS1;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
+import org.apache.beam.sdk.extensions.sql.TestUtils;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.hamcrest.core.StringContains;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class BeamSideInputLookupJoinRelTest extends BaseRelTest {
+
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private static final boolean nullable = true;
+
+  /** Test table for JOIN-AS-LOOKUP. */
+  public static class SiteLookupTable extends BaseBeamTable implements 
BeamSqlSeekableTable {
+
+    public SiteLookupTable(Schema schema) {
+      super(schema);
+    }
+
+    @Override
+    public PCollection.IsBounded isBounded() {
+      return PCollection.IsBounded.BOUNDED;
+    }
+
+    @Override
+    public PCollection<Row> buildIOReader(PBegin begin) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public POutput buildIOWriter(PCollection<Row> input) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Row> seekRow(Row lookupSubRow) {
+      if (lookupSubRow.getInt32("site_id") == 2) {
+        return Arrays.asList(Row.withSchema(getSchema()).addValues(2, 
"SITE1").build());
+      }
+      return Arrays.asList(Row.nullRow(getSchema()));
+    }
+
+    @Override
+    public BeamTableStatistics getTableStatistics(PipelineOptions options) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+  }
+
+  @BeforeClass
+  public static void prepare() {
+    BeamSideInputJoinRelTest.registerUnboundedTable();
+    registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+    registerTable(
+        "SITE_LKP",
+        new SiteLookupTable(
+            TestTableUtils.buildBeamSqlNullableSchema(
+                Schema.FieldType.INT32,
+                "site_id",
+                nullable,
+                Schema.FieldType.STRING,
+                "site_name",
+                nullable)));
+  }
+
+  @Test
+  public void testBoundedTableInnerJoinWithLookupTable() throws Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " ORDER_DETAILS1 o1 "
+            + " JOIN SITE_LKP o2 "
+            + " on "
+            + " o1.site_id=o2.site_id "
+            + " WHERE o1.site_id=2 ";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
+                .addRows(1, "SITE1")
+                .getStringRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLookupTableInnerJoinWithBoundedTable() throws Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " SITE_LKP o2 "
+            + " JOIN ORDER_DETAILS1 o1 "
+            + " on "
+            + " o1.site_id=o2.site_id "
+            + " WHERE o1.site_id=2 ";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
+                .addRows(1, "SITE1")
+                .getStringRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testUnboundedTableInnerJoinWithLookupTable() throws Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + "(select order_id, site_id FROM ORDER_DETAILS "
+            + "          GROUP BY order_id, site_id, TUMBLE(order_time, 
INTERVAL '1' HOUR)) o1 "
+            + " JOIN "
+            + " SITE_LKP o2 "
+            + " on "
+            + " o1.site_id=o2.site_id"
+            + " WHERE o1.site_id=2 ";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
+                .addRows(1, "SITE1")
+                .addRows(2, "SITE1")
+                .getStringRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLookupTableInnerJoinWithUnboundedTable() throws Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " SITE_LKP o2 "
+            + " JOIN "
+            + "(select order_id, site_id FROM ORDER_DETAILS "
+            + "          GROUP BY order_id, site_id, TUMBLE(order_time, 
INTERVAL '1' HOUR)) o1 "
+            + " on "
+            + " o1.site_id=o2.site_id"
+            + " WHERE o1.site_id=2 ";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                    Schema.FieldType.INT32, "order_id",
+                    Schema.FieldType.STRING, "site_name")
+                .addRows(1, "SITE1")
+                .addRows(2, "SITE1")
+                .getStringRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLookupTableRightOuterJoinWithBoundedTable() throws Exception 
{
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " SITE_LKP o2 "
+            + " RIGHT OUTER JOIN "
+            + " ORDER_DETAILS1 o1 "
+            + " on "
+            + " o1.site_id=o2.site_id ";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.ofNullable(
+                    Schema.FieldType.INT32,
+                    "order_id",
+                    nullable,
+                    Schema.FieldType.STRING,
+                    "site_name",
+                    nullable)
+                .addRows(1, "SITE1")
+                .addRows(2, null)
+                .addRows(3, null)
+                .getStringRows());
+    pipeline.run();
+  }
+
+  @Test
+  public void testUnboundedTableLeftOuterJoinWithLookupTable() throws 
Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + "(select order_id, site_id FROM ORDER_DETAILS "
+            + "          GROUP BY order_id, site_id, TUMBLE(order_time, 
INTERVAL '1' HOUR)) o1 "
+            + " LEFT OUTER JOIN "
+            + " SITE_LKP o2 "
+            + " on "
+            + " o1.site_id=o2.site_id";
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.ofNullable(
+                    Schema.FieldType.INT32,
+                    "order_id",
+                    nullable,
+                    Schema.FieldType.STRING,
+                    "site_name",
+                    nullable)
+                .addRows(1, "SITE1")
+                .addRows(2, "SITE1")
+                .addRows(1, null)
+                .addRows(2, null)
+                .addRows(3, null)
+                .getStringRows());
+    pipeline.run();
+  }
+
+  @Test
+  // Do not add a filter like "WHERE o1.order_id=2". By adding that filter, 
FilterJoinRule may
+  // convert "LEFT OUTER JOIN" to "INNER JOIN".
+  public void testLookupTableLeftOuterJoinWithBoundedTableError() throws 
Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + " SITE_LKP o2 "
+            + " LEFT OUTER JOIN "
+            + " ORDER_DETAILS1 o1 "
+            + " on "
+            + " o1.site_id=o2.site_id ";
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(StringContains.containsString("OUTER JOIN must be a 
non Seekable table"));
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    pipeline.run();
+  }
+
+  @Test
+  // Do not add a filter like "WHERE o1.order_id=2". By adding that filter, 
FilterJoinRule may
+  // convert "FULL OUTER JOIN" to "LEFT OUTER JOIN", which, in tis case is a 
valid scenario.
+  public void testUnboundedTableFullOuterJoinWithLookupTableError() throws 
Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + "(select order_id, site_id FROM ORDER_DETAILS "
+            + "          GROUP BY order_id, site_id, TUMBLE(order_time, 
INTERVAL '1' HOUR)) o1 "
+            + " FULL OUTER JOIN "
+            + " SITE_LKP o2 "
+            + " on "
+            + " o1.site_id=o2.site_id";
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(StringContains.containsString("not supported"));
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    pipeline.run();
+  }
+
+  @Test
+  // Do not add a filter like "WHERE o1.order_id=2". By adding that filter, 
FilterJoinRule may
+  // convert "RIGHT OUTER JOIN" to "INNER JOIN".
+  public void testUnboundedTableRightOuterJoinWithLookupTableError() throws 
Exception {
+    String sql =
+        "SELECT o1.order_id, o2.site_name FROM "
+            + "(select order_id, site_id FROM ORDER_DETAILS "
+            + "          GROUP BY order_id, site_id, TUMBLE(order_time, 
INTERVAL '1' HOUR)) o1 "
+            + " RIGHT OUTER JOIN "
+            + " SITE_LKP o2 "
+            + " on "
+            + " o1.site_id=o2.site_id";
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(StringContains.containsString("OUTER JOIN must be a 
non Seekable table"));
+    PCollection<Row> rows = compilePipeline(sql, pipeline);
+    pipeline.run();
+  }
+}

Reply via email to