This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 96d8e68 [BEAM-6114] Handle Unsupported Lookup Joins new 84d5c87 Merge pull request #9453 from rahul8383/handle-unsupported-lookupjoins 96d8e68 is described below commit 96d8e68de882d49cdaa39bd0e598d08dcc15e043 Author: rahul8383 <rahulpatwari8...@gmail.com> AuthorDate: Thu Aug 29 23:42:52 2019 +0530 [BEAM-6114] Handle Unsupported Lookup Joins --- .../extensions/sql/impl/rel/BeamCoGBKJoinRel.java | 20 ++ .../sdk/extensions/sql/impl/rel/BeamJoinRel.java | 57 ++-- .../sql/impl/rel/BeamSideInputJoinRel.java | 24 ++ .../sql/impl/rel/BeamSideInputLookupJoinRel.java | 41 ++- .../sql/meta/provider/test/TestTableUtils.java | 14 + .../apache/beam/sdk/extensions/sql/TestUtils.java | 8 + .../rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java | 50 ---- .../BeamCoGBKJoinRelUnboundedVsUnboundedTest.java | 2 +- ...ndedTest.java => BeamSideInputJoinRelTest.java} | 115 ++------ .../impl/rel/BeamSideInputLookupJoinRelTest.java | 295 +++++++++++++++++++++ 10 files changed, 446 insertions(+), 180 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java index 0e165b6..d6ac71b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRel.java @@ -45,6 +45,26 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; +/** + * A {@code BeamJoinRel} which does CoGBK Join + * + * <p>This Join Covers the cases: + * + * <ul> + * <li>BoundedTable JOIN BoundedTable + * <li>UnboundedTable JOIN UnboundedTable + * </ul> + * + * <p>A CoGBK join is utilized as long as the windowFn of the both sides match. For more info refer + * <a href="https://issues.apache.org/jira/browse/BEAM-3345">BEAM-3345</a> + * + * <p>General constraints: + * + * <ul> + * <li>Only equi-join is supported. + * <li>CROSS JOIN is not supported. + * </ul> + */ public class BeamCoGBKJoinRel extends BeamJoinRel { public BeamCoGBKJoinRel( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index f2739e4..422242f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -62,33 +62,15 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.Pair; /** - * {@code BeamRelNode} to replace a {@code Join} node. + * An abstract {@code BeamRelNode} to implement Join Rels. * - * <p>Support for join can be categorized into 3 cases: + * <p>Support for join can be categorized into 4 cases: * * <ul> * <li>BoundedTable JOIN BoundedTable * <li>UnboundedTable JOIN UnboundedTable * <li>BoundedTable JOIN UnboundedTable - * </ul> - * - * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both sides - * match. - * - * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some - * constraints: - * - * <ul> - * <li>{@code FULL OUTER JOIN} is not supported. - * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side. - * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side. - * </ul> - * - * <p>There are also some general constraints: - * - * <ul> - * <li>Only equi-join is supported. - * <li>CROSS JOIN is not supported. + * <li>SeekableTable JOIN non SeekableTable * </ul> */ public abstract class BeamJoinRel extends Join implements BeamRelNode { @@ -374,11 +356,22 @@ public abstract class BeamJoinRel extends Join implements BeamRelNode { throw new UnsupportedOperationException("Cannot get column index from " + rexNode.getType()); } - // The Volcano planner works in a top-down fashion. It starts by transforming - // the root and move towards the leafs of the plan. Due to this when - // transforming a logical join its inputs are still in the logical convention. - // So, Recursively visit the inputs of the RelNode till BeamIOSourceRel is encountered and - // propagate the boundedness upwards. + /** + * This method returns the Boundedness of a RelNode. It is used during planning and applying + * {@link org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule} and {@link + * org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule} + * + * <p>The Volcano planner works in a top-down fashion. It starts by transforming the root and move + * towards the leafs of the plan. Due to this when transforming a logical join its inputs are + * still in the logical convention. So, Recursively visit the inputs of the RelNode till + * BeamIOSourceRel is encountered and propagate the boundedness upwards. + * + * <p>The Boundedness of each child of a RelNode is stored in a list. If any of the children are + * Unbounded, the RelNode is Unbounded. Else, the RelNode is Bounded. + * + * @param relNode the RelNode whose Boundedness has to be determined + * @return {@code PCollection.isBounded} + */ public static PCollection.IsBounded getBoundednessOfRelNode(RelNode relNode) { if (relNode instanceof BeamRelNode) { return (((BeamRelNode) relNode).isBounded()); @@ -387,8 +380,7 @@ public abstract class BeamJoinRel extends Join implements BeamRelNode { for (RelNode inputRel : relNode.getInputs()) { if (inputRel instanceof RelSubset) { // Consider the RelNode with best cost in the RelSubset. If best cost RelNode cannot be - // determined, consider the first RelNode in the RelSubset(Is there a better way to do - // this?) + // determined, consider the first RelNode in the RelSubset RelNode rel = ((RelSubset) inputRel).getBest(); if (rel == null) { rel = ((RelSubset) inputRel).getRelList().get(0); @@ -404,6 +396,15 @@ public abstract class BeamJoinRel extends Join implements BeamRelNode { : PCollection.IsBounded.BOUNDED); } + /** + * This method returns whether any of the children of the relNode are Seekable. It is used during + * planning and applying {@link org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule} + * and {@link org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule} and {@link + * org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputLookupJoinRule} + * + * @param relNode the relNode whose children can be Seekable + * @return A boolean + */ public static boolean containsSeekableInput(RelNode relNode) { for (RelNode relInput : relNode.getInputs()) { if (relInput instanceof RelSubset) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java index 3e678e6..7366dcd 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRel.java @@ -39,6 +39,30 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; +/** + * A {@code BeamJoinRel} which does sideinput Join + * + * <p>This Join Covers the case: + * + * <ul> + * <li>BoundedTable JOIN UnboundedTable + * </ul> + * + * <p>{@code sideInput} is utilized to implement the join, so there are some constraints: + * + * <ul> + * <li>{@code FULL OUTER JOIN} is not supported. + * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side. + * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side. + * </ul> + * + * <p>General constraints: + * + * <ul> + * <li>Only equi-join is supported. + * <li>CROSS JOIN is not supported. + * </ul> + */ public class BeamSideInputJoinRel extends BeamJoinRel { public BeamSideInputJoinRel( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java index 58393c2..27ecae6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRel.java @@ -34,6 +34,30 @@ import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; +/** + * A {@code BeamJoinRel} which does Lookup Join + * + * <p>This Join Covers the case: + * + * <ul> + * <li>SeekableTable JOIN non SeekableTable + * </ul> + * + * <p>As Join is implemented as lookup, there are some constraints: + * + * <ul> + * <li>{@code FULL OUTER JOIN} is not supported. + * <li>If it's a {@code LEFT OUTER JOIN}, the non Seekable table should on the left side. + * <li>If it's a {@code RIGHT OUTER JOIN}, the non Seekable table should on the right side. + * </ul> + * + * <p>General constraints: + * + * <ul> + * <li>Only equi-join is supported. + * <li>CROSS JOIN is not supported. + * </ul> + */ public class BeamSideInputLookupJoinRel extends BeamJoinRel { public BeamSideInputLookupJoinRel( @@ -49,7 +73,22 @@ public class BeamSideInputLookupJoinRel extends BeamJoinRel { @Override public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() { - // Should we throw Exception when joinType is LEFT (or) RIGHT (or) FULL? + // if one of the sides is Seekable & the other is non Seekable + // then do a sideInputLookup join. + // When doing a sideInputLookup join, the windowFn does not need to match. + // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be + // non Seekable & RIGHT OUTER JOIN where right side of the join must be non Seekable + if (joinType == JoinRelType.FULL) { + throw new UnsupportedOperationException( + "FULL OUTER JOIN is not supported when join " + + "a Seekable table with a non Seekable table."); + } + + if ((joinType == JoinRelType.LEFT && seekableInputIndex().get() == 0) + || (joinType == JoinRelType.RIGHT && seekableInputIndex().get() == 1)) { + throw new UnsupportedOperationException( + String.format("%s side of an OUTER JOIN must be a non Seekable table.", joinType.name())); + } return new SideInputLookupJoin(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java index 1fd44ff..777209a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableUtils.java @@ -55,11 +55,25 @@ public class TestTableUtils { .collect(toSchema()); } + public static Schema buildBeamSqlNullableSchema(Object... args) { + return Stream.iterate(0, i -> i + 3) + .limit(args.length / 3) + .map(i -> toNullableRecordField(args, i)) + .collect(toSchema()); + } + // TODO: support nested. public static Schema.Field toRecordField(Object[] args, int i) { return Schema.Field.of((String) args[i + 1], (FieldType) args[i]); } + public static Schema.Field toNullableRecordField(Object[] args, int i) { + if ((boolean) args[i + 2]) { + return Schema.Field.nullable((String) args[i + 1], (FieldType) args[i]); + } + return Schema.Field.of((String) args[i + 1], (FieldType) args[i]); + } + /** * Convenient way to build a {@code BeamSqlRow}s. * diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index 7e14380..b7d6791 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -104,6 +104,14 @@ public class TestUtils { return builder; } + public static RowsBuilder ofNullable(final Object... args) { + Schema beamSQLSchema = TestTableUtils.buildBeamSqlNullableSchema(args); + RowsBuilder builder = new RowsBuilder(); + builder.type = beamSQLSchema; + + return builder; + } + /** * Create a RowsBuilder with the specified row type info. * diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java index ca65e31..f572b67 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java @@ -17,15 +17,12 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rel; -import org.apache.beam.sdk.extensions.sql.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; -import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.calcite.rel.RelNode; @@ -55,17 +52,10 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest { Schema.FieldType.INT32, "price") .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5); - public static final BeamSqlTable SITE_LKP = - new BeamSideInputJoinRelUnboundedVsBoundedTest.SiteLookupTable( - TestTableUtils.buildBeamSqlSchema( - Schema.FieldType.INT32, "order_id", - Schema.FieldType.STRING, "site_name")); - @BeforeClass public static void prepare() { registerTable("ORDER_DETAILS1", ORDER_DETAILS1); registerTable("ORDER_DETAILS2", ORDER_DETAILS2); - registerTable("SITE_LKP", SITE_LKP); } @Test @@ -385,44 +375,4 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest { compilePipeline(sql, pipeline); pipeline.run(); } - - @Test - public void testBoundedVsLookupTableJoin() throws Exception { - String sql = - "SELECT o1.order_id, o2.site_name FROM " - + " ORDER_DETAILS1 o1 " - + " JOIN SITE_LKP o2 " - + " on " - + " o1.order_id=o2.order_id " - + " WHERE o1.order_id=1"; - PCollection<Row> rows = compilePipeline(sql, pipeline); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Schema.FieldType.INT32, "order_id", - Schema.FieldType.STRING, "site_name") - .addRows(1, "SITE1") - .getStringRows()); - pipeline.run(); - } - - @Test - public void testLookupTableVsBoundedJoin() throws Exception { - String sql = - "SELECT o1.order_id, o2.site_name FROM " - + " SITE_LKP o2 " - + " JOIN ORDER_DETAILS1 o1 " - + " on " - + " o1.order_id=o2.order_id " - + " WHERE o1.order_id=1"; - PCollection<Row> rows = compilePipeline(sql, pipeline); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Schema.FieldType.INT32, "order_id", - Schema.FieldType.STRING, "site_name") - .addRows(1, "SITE1") - .getStringRows()); - pipeline.run(); - } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java index f9a5fb4..1f73e85 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelUnboundedVsUnboundedTest.java @@ -36,7 +36,7 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -/** Unbounded + Unbounded Test for {@code BeamStandardJoinRel}. */ +/** Unbounded + Unbounded Test for {@code BeamCoGBKJoinRel}. */ public class BeamCoGBKJoinRelUnboundedVsUnboundedTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); private static final DateTime FIRST_DATE = new DateTime(1); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelTest.java similarity index 77% rename from sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java rename to sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelTest.java index 45881a8..91043c3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputJoinRelTest.java @@ -17,25 +17,16 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rel; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; import org.apache.beam.sdk.extensions.sql.TestUtils; -import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; -import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; -import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.apache.calcite.rel.RelNode; import org.joda.time.DateTime; @@ -45,8 +36,8 @@ import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -/** Unbounded + Unbounded Test for {@code BeamSideInputJoinRel}. */ -public class BeamSideInputJoinRelUnboundedVsBoundedTest extends BaseRelTest { +/** Unbounded + Bounded Test for {@code BeamSideInputJoinRel}. */ +public class BeamSideInputJoinRelTest extends BaseRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); public static final DateTime FIRST_DATE = new DateTime(1); public static final DateTime SECOND_DATE = new DateTime(1 + 3600 * 1000); @@ -55,6 +46,19 @@ public class BeamSideInputJoinRelUnboundedVsBoundedTest extends BaseRelTest { @BeforeClass public static void prepare() { + registerUnboundedTable(); + + registerTable( + "ORDER_DETAILS1", + TestBoundedTable.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.STRING, "buyer") + .addRows( + 1, "james", + 2, "bond")); + } + + public static void registerUnboundedTable() { registerTable( "ORDER_DETAILS", TestUnboundedTable.of( @@ -90,55 +94,6 @@ public class BeamSideInputJoinRelUnboundedVsBoundedTest extends BaseRelTest { 2, 3, SECOND_DATE)); - - registerTable( - "ORDER_DETAILS1", - TestBoundedTable.of( - Schema.FieldType.INT32, "order_id", - Schema.FieldType.STRING, "buyer") - .addRows( - 1, "james", - 2, "bond")); - - registerTable( - "SITE_LKP", - new SiteLookupTable( - TestTableUtils.buildBeamSqlSchema( - Schema.FieldType.INT32, "site_id", - Schema.FieldType.STRING, "site_name"))); - } - - /** Test table for JOIN-AS-LOOKUP. */ - public static class SiteLookupTable extends BaseBeamTable implements BeamSqlSeekableTable { - - public SiteLookupTable(Schema schema) { - super(schema); - } - - @Override - public PCollection.IsBounded isBounded() { - return PCollection.IsBounded.BOUNDED; - } - - @Override - public PCollection<Row> buildIOReader(PBegin begin) { - throw new UnsupportedOperationException(); - } - - @Override - public POutput buildIOWriter(PCollection<Row> input) { - throw new UnsupportedOperationException(); - } - - @Override - public List<Row> seekRow(Row lookupSubRow) { - return Arrays.asList(Row.withSchema(getSchema()).addValues(1, "SITE1").build()); - } - - @Override - public BeamTableStatistics getTableStatistics(PipelineOptions options) { - return BeamTableStatistics.BOUNDED_UNKNOWN; - } } @Test @@ -322,44 +277,4 @@ public class BeamSideInputJoinRelUnboundedVsBoundedTest extends BaseRelTest { compilePipeline(sql, pipeline); pipeline.run(); } - - @Test - public void testUnboundedVsLookupTableJoin() throws Exception { - String sql = - "SELECT o1.order_id, o2.site_name FROM " - + " ORDER_DETAILS o1 " - + " JOIN SITE_LKP o2 " - + " on " - + " o1.site_id=o2.site_id " - + " WHERE o1.site_id=1"; - PCollection<Row> rows = compilePipeline(sql, pipeline); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Schema.FieldType.INT32, "order_id", - Schema.FieldType.STRING, "site_name") - .addRows(1, "SITE1") - .getStringRows()); - pipeline.run(); - } - - @Test - public void testLookupTableVsUnboundedJoin() throws Exception { - String sql = - "SELECT o1.order_id, o2.site_name FROM " - + " SITE_LKP o2 " - + " JOIN ORDER_DETAILS o1 " - + " on " - + " o1.site_id=o2.site_id " - + " WHERE o1.site_id=1"; - PCollection<Row> rows = compilePipeline(sql, pipeline); - PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) - .containsInAnyOrder( - TestUtils.RowsBuilder.of( - Schema.FieldType.INT32, "order_id", - Schema.FieldType.STRING, "site_name") - .addRows(1, "SITE1") - .getStringRows()); - pipeline.run(); - } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java new file mode 100644 index 0000000..8b4f51a --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSideInputLookupJoinRelTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import static org.apache.beam.sdk.extensions.sql.impl.rel.BeamCoGBKJoinRelBoundedVsBoundedTest.ORDER_DETAILS1; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable; +import org.apache.beam.sdk.extensions.sql.TestUtils; +import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics; +import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableUtils; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.hamcrest.core.StringContains; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class BeamSideInputLookupJoinRelTest extends BaseRelTest { + + @Rule public final TestPipeline pipeline = TestPipeline.create(); + @Rule public ExpectedException thrown = ExpectedException.none(); + private static final boolean nullable = true; + + /** Test table for JOIN-AS-LOOKUP. */ + public static class SiteLookupTable extends BaseBeamTable implements BeamSqlSeekableTable { + + public SiteLookupTable(Schema schema) { + super(schema); + } + + @Override + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.BOUNDED; + } + + @Override + public PCollection<Row> buildIOReader(PBegin begin) { + throw new UnsupportedOperationException(); + } + + @Override + public POutput buildIOWriter(PCollection<Row> input) { + throw new UnsupportedOperationException(); + } + + @Override + public List<Row> seekRow(Row lookupSubRow) { + if (lookupSubRow.getInt32("site_id") == 2) { + return Arrays.asList(Row.withSchema(getSchema()).addValues(2, "SITE1").build()); + } + return Arrays.asList(Row.nullRow(getSchema())); + } + + @Override + public BeamTableStatistics getTableStatistics(PipelineOptions options) { + return BeamTableStatistics.BOUNDED_UNKNOWN; + } + } + + @BeforeClass + public static void prepare() { + BeamSideInputJoinRelTest.registerUnboundedTable(); + registerTable("ORDER_DETAILS1", ORDER_DETAILS1); + registerTable( + "SITE_LKP", + new SiteLookupTable( + TestTableUtils.buildBeamSqlNullableSchema( + Schema.FieldType.INT32, + "site_id", + nullable, + Schema.FieldType.STRING, + "site_name", + nullable))); + } + + @Test + public void testBoundedTableInnerJoinWithLookupTable() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + " ORDER_DETAILS1 o1 " + + " JOIN SITE_LKP o2 " + + " on " + + " o1.site_id=o2.site_id " + + " WHERE o1.site_id=2 "; + PCollection<Row> rows = compilePipeline(sql, pipeline); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.STRING, "site_name") + .addRows(1, "SITE1") + .getStringRows()); + pipeline.run(); + } + + @Test + public void testLookupTableInnerJoinWithBoundedTable() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + " SITE_LKP o2 " + + " JOIN ORDER_DETAILS1 o1 " + + " on " + + " o1.site_id=o2.site_id " + + " WHERE o1.site_id=2 "; + PCollection<Row> rows = compilePipeline(sql, pipeline); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.STRING, "site_name") + .addRows(1, "SITE1") + .getStringRows()); + pipeline.run(); + } + + @Test + public void testUnboundedTableInnerJoinWithLookupTable() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + "(select order_id, site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " JOIN " + + " SITE_LKP o2 " + + " on " + + " o1.site_id=o2.site_id" + + " WHERE o1.site_id=2 "; + PCollection<Row> rows = compilePipeline(sql, pipeline); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.STRING, "site_name") + .addRows(1, "SITE1") + .addRows(2, "SITE1") + .getStringRows()); + pipeline.run(); + } + + @Test + public void testLookupTableInnerJoinWithUnboundedTable() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + " SITE_LKP o2 " + + " JOIN " + + "(select order_id, site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " on " + + " o1.site_id=o2.site_id" + + " WHERE o1.site_id=2 "; + PCollection<Row> rows = compilePipeline(sql, pipeline); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Schema.FieldType.INT32, "order_id", + Schema.FieldType.STRING, "site_name") + .addRows(1, "SITE1") + .addRows(2, "SITE1") + .getStringRows()); + pipeline.run(); + } + + @Test + public void testLookupTableRightOuterJoinWithBoundedTable() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + " SITE_LKP o2 " + + " RIGHT OUTER JOIN " + + " ORDER_DETAILS1 o1 " + + " on " + + " o1.site_id=o2.site_id "; + PCollection<Row> rows = compilePipeline(sql, pipeline); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.ofNullable( + Schema.FieldType.INT32, + "order_id", + nullable, + Schema.FieldType.STRING, + "site_name", + nullable) + .addRows(1, "SITE1") + .addRows(2, null) + .addRows(3, null) + .getStringRows()); + pipeline.run(); + } + + @Test + public void testUnboundedTableLeftOuterJoinWithLookupTable() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + "(select order_id, site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " LEFT OUTER JOIN " + + " SITE_LKP o2 " + + " on " + + " o1.site_id=o2.site_id"; + PCollection<Row> rows = compilePipeline(sql, pipeline); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.ofNullable( + Schema.FieldType.INT32, + "order_id", + nullable, + Schema.FieldType.STRING, + "site_name", + nullable) + .addRows(1, "SITE1") + .addRows(2, "SITE1") + .addRows(1, null) + .addRows(2, null) + .addRows(3, null) + .getStringRows()); + pipeline.run(); + } + + @Test + // Do not add a filter like "WHERE o1.order_id=2". By adding that filter, FilterJoinRule may + // convert "LEFT OUTER JOIN" to "INNER JOIN". + public void testLookupTableLeftOuterJoinWithBoundedTableError() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + " SITE_LKP o2 " + + " LEFT OUTER JOIN " + + " ORDER_DETAILS1 o1 " + + " on " + + " o1.site_id=o2.site_id "; + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage(StringContains.containsString("OUTER JOIN must be a non Seekable table")); + PCollection<Row> rows = compilePipeline(sql, pipeline); + pipeline.run(); + } + + @Test + // Do not add a filter like "WHERE o1.order_id=2". By adding that filter, FilterJoinRule may + // convert "FULL OUTER JOIN" to "LEFT OUTER JOIN", which, in tis case is a valid scenario. + public void testUnboundedTableFullOuterJoinWithLookupTableError() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + "(select order_id, site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " FULL OUTER JOIN " + + " SITE_LKP o2 " + + " on " + + " o1.site_id=o2.site_id"; + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage(StringContains.containsString("not supported")); + PCollection<Row> rows = compilePipeline(sql, pipeline); + pipeline.run(); + } + + @Test + // Do not add a filter like "WHERE o1.order_id=2". By adding that filter, FilterJoinRule may + // convert "RIGHT OUTER JOIN" to "INNER JOIN". + public void testUnboundedTableRightOuterJoinWithLookupTableError() throws Exception { + String sql = + "SELECT o1.order_id, o2.site_name FROM " + + "(select order_id, site_id FROM ORDER_DETAILS " + + " GROUP BY order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 " + + " RIGHT OUTER JOIN " + + " SITE_LKP o2 " + + " on " + + " o1.site_id=o2.site_id"; + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage(StringContains.containsString("OUTER JOIN must be a non Seekable table")); + PCollection<Row> rows = compilePipeline(sql, pipeline); + pipeline.run(); + } +}