This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 586739d [BEAM-13577] Beam Select's uniquifyNames function loses nullability of Complex types while inferring schema new ab6374e Merge pull request #16380 from talatuyarer/BEAM-13577-select-uniquifyNames-nullablity 586739d is described below commit 586739d854b5f6503f03469bee278de0bbd72ce9 Author: Talat Uyarer <tuya...@paloaltonetworks.com> AuthorDate: Thu Jan 13 21:21:04 2022 -0800 [BEAM-13577] Beam Select's uniquifyNames function loses nullability of Complex types while inferring schema --- .../apache/beam/sdk/schemas/transforms/Select.java | 10 ++-- .../sdk/extensions/sql/BeamSqlDslArrayTest.java | 53 +++++++++++++++++++--- .../rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java | 32 ++++++++++++- 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java index 597aa62..763b7cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java @@ -226,12 +226,16 @@ public class Select { .withNullable(fieldType.getNullable()) .withMetadata(fieldType.getAllMetadata()); case ARRAY: - return FieldType.array(uniquifyNames(fieldType.getCollectionElementType())); + return FieldType.array(uniquifyNames(fieldType.getCollectionElementType())) + .withNullable(fieldType.getNullable()); case ITERABLE: - return FieldType.iterable(uniquifyNames(fieldType.getCollectionElementType())); + return FieldType.iterable(uniquifyNames(fieldType.getCollectionElementType())) + .withNullable(fieldType.getNullable()); case MAP: return FieldType.map( - uniquifyNames(fieldType.getMapKeyType()), uniquifyNames(fieldType.getMapValueType())); + uniquifyNames(fieldType.getMapKeyType()), + uniquifyNames(fieldType.getMapValueType())) + .withNullable(fieldType.getNullable()); default: return fieldType; } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java index f072467..65d6d72 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql; import java.util.Arrays; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -37,7 +38,8 @@ public class BeamSqlDslArrayTest { private static final Schema INPUT_SCHEMA = Schema.builder() .addInt32Field("f_int") - .addArrayField("f_stringArr", Schema.FieldType.STRING) + .addStringField("f_string") + .addNullableField("f_stringArr", FieldType.array(Schema.FieldType.STRING)) .build(); @Rule public final TestPipeline pipeline = TestPipeline.create(); @@ -91,6 +93,41 @@ public class BeamSqlDslArrayTest { } @Test + public void testProjectArrayFieldWithCoGBKJoin() { + PCollection<Row> input = pCollectionOf2Elements(); + + Schema resultType = + Schema.builder() + .addNullableField("f_stringArr", FieldType.array(Schema.FieldType.STRING)) + .build(); + + // When we use longer enough IN clause, Calcite calculate physical plan with BeamCoGBKJoin. + // This SQL push Calcite to use BeamCoGBKJoin for simple select statement. + PCollection<Row> result = + input.apply( + "sqlQuery", + SqlTransform.query( + "SELECT f_stringArr FROM PCOLLECTION WHERE f_string IN ('A', 'B', " + + "'ABCAABAAAGAG','ABCAABAAAGCB','ABCAABAAAGCJ','ABCAABAAAGEB','ABCAABAAAGEK'," + + "'ABCAABAAAGFB','ABCAABAAAGFG','ABCAABABAGBJ','ABCAABABBKIF','ABCAABABCAIK'," + + "'ABCAABAEJAAF','ABCAABAEJAED','ABCAABAEJAEE','ABCAABAEJAEF','ABCIABAAAGGJ'," + + "'ABCIABAAAGKB','ABCIABAAAJBC','ABCIABAAAJCD','ABCIABAAAJEK','ABCIABAAAJFE'," + + "'ABCIABAAAJGE','ABCIABAAAJGF','ABCIABAAAJGG','ABCIABAAAJJK','ABCIABAABAGK'," + + "'ABCIABAABAKD','ABCIABAABBDI','ABCIABAABBEI','ABCIABAABFBB','ABCIABAABFBJ'," + + "'ABCIABAABFCC','ABCIABAABFDI','ABCIABACFBKF','ABCIABAJAIBG','ABCIABBBDAAC'," + + "'ABCIABBFJGAD','ABCIABBGJFDK','ABCIABCAAFBB','ABCIABCAAFJC','ABCIABCACADA'," + + "'ABGDABAAGFGA','ABGDABAAGFGF','ABGDABAAGFJG','ABGDABAAGFJK','ABGDABAAGFKJ'," + + "'ABGDABAAGFKI')")); + + PAssert.that(result) + .containsInAnyOrder( + Row.withSchema(resultType).addArray(Arrays.asList("111", "222")).build(), + Row.withSchema(resultType).addArray(Arrays.asList("33", "44", "55")).build()); + + pipeline.run(); + } + + @Test public void testAccessArrayElement() { PCollection<Row> input = pCollectionOf2Elements(); @@ -109,7 +146,8 @@ public class BeamSqlDslArrayTest { @Test public void testSingleElement() throws Exception { - Row inputRow = Row.withSchema(INPUT_SCHEMA).addValues(1).addArray(Arrays.asList("111")).build(); + Row inputRow = + Row.withSchema(INPUT_SCHEMA).addValues(1, "A").addArray(Arrays.asList("111")).build(); PCollection<Row> input = pipeline.apply("boundedInput1", Create.of(inputRow).withRowSchema(INPUT_SCHEMA)); @@ -199,12 +237,15 @@ public class BeamSqlDslArrayTest { public void testUnnestCrossJoin() { Row row1 = Row.withSchema(INPUT_SCHEMA) - .addValues(42) + .addValues(42, "S") .addArray(Arrays.asList("111", "222", "333")) .build(); Row row2 = - Row.withSchema(INPUT_SCHEMA).addValues(13).addArray(Arrays.asList("444", "555")).build(); + Row.withSchema(INPUT_SCHEMA) + .addValues(13, "T") + .addArray(Arrays.asList("444", "555")) + .build(); PCollection<Row> input = pipeline.apply("boundedInput1", Create.of(row1, row2).withRowSchema(INPUT_SCHEMA)); @@ -390,11 +431,11 @@ public class BeamSqlDslArrayTest { "boundedInput1", Create.of( Row.withSchema(INPUT_SCHEMA) - .addValues(1) + .addValues(1, "A") .addArray(Arrays.asList("111", "222")) .build(), Row.withSchema(INPUT_SCHEMA) - .addValues(2) + .addValues(2, "B") .addArray(Arrays.asList("33", "44", "55")) .build()) .withRowSchema(INPUT_SCHEMA)); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java index b403d83..cbaf56b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java @@ -17,11 +17,13 @@ */ package org.apache.beam.sdk.extensions.sql.impl.rel; +import java.util.Arrays; import org.apache.beam.sdk.extensions.sql.TestUtils; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery; import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -46,6 +48,30 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest { Schema.FieldType.INT32, "price") .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5); + public static final TestBoundedTable ORDER_DETAILS1_WITH_ARRAY = + TestBoundedTable.of( + Schema.FieldType.INT32, + "order_id", + Schema.FieldType.INT32, + "site_id", + Schema.FieldType.INT32, + "price", + FieldType.array(FieldType.STRING).withNullable(true), + "f_stringArr") + .addRows( + 1, + 2, + 3, + Arrays.asList("111", "222", "333"), + 2, + 3, + 3, + Arrays.asList("222", "333", "333"), + 3, + 4, + 5, + Arrays.asList("333", "444", "555")); + public static final TestBoundedTable ORDER_DETAILS2 = TestBoundedTable.of( Schema.FieldType.INT32, "order_id", @@ -56,6 +82,7 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest { @BeforeClass public static void prepare() { registerTable("ORDER_DETAILS1", ORDER_DETAILS1); + registerTable("ORDER_DETAILS1_WITH_ARRAY", ORDER_DETAILS1_WITH_ARRAY); registerTable("ORDER_DETAILS2", ORDER_DETAILS2); } @@ -63,7 +90,7 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest { public void testInnerJoin() throws Exception { String sql = "SELECT * " - + "FROM ORDER_DETAILS1 o1" + + "FROM ORDER_DETAILS1_WITH_ARRAY o1" + " JOIN ORDER_DETAILS2 o2" + " on " + " o1.order_id=o2.site_id AND o2.price=o1.site_id"; @@ -76,11 +103,12 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends BaseRelTest { .addField("order_id", Schema.FieldType.INT32) .addField("site_id", Schema.FieldType.INT32) .addField("price", Schema.FieldType.INT32) + .addNullableField("f_stringArr", FieldType.array(FieldType.STRING)) .addField("order_id0", Schema.FieldType.INT32) .addField("site_id0", Schema.FieldType.INT32) .addField("price0", Schema.FieldType.INT32) .build()) - .addRows(2, 3, 3, 1, 2, 3) + .addRows(2, 3, 3, Arrays.asList("222", "333", "333"), 1, 2, 3) .getRows()); pipeline.run(); }