This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 586739d  [BEAM-13577] Beam Select's uniquifyNames function loses 
nullability of Complex types while inferring schema
     new ab6374e  Merge pull request #16380 from 
talatuyarer/BEAM-13577-select-uniquifyNames-nullablity
586739d is described below

commit 586739d854b5f6503f03469bee278de0bbd72ce9
Author: Talat Uyarer <tuya...@paloaltonetworks.com>
AuthorDate: Thu Jan 13 21:21:04 2022 -0800

    [BEAM-13577] Beam Select's uniquifyNames function loses nullability of 
Complex types while inferring schema
---
 .../apache/beam/sdk/schemas/transforms/Select.java | 10 ++--
 .../sdk/extensions/sql/BeamSqlDslArrayTest.java    | 53 +++++++++++++++++++---
 .../rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java  | 32 ++++++++++++-
 3 files changed, 84 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
index 597aa62..763b7cf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java
@@ -226,12 +226,16 @@ public class Select {
             .withNullable(fieldType.getNullable())
             .withMetadata(fieldType.getAllMetadata());
       case ARRAY:
-        return 
FieldType.array(uniquifyNames(fieldType.getCollectionElementType()));
+        return 
FieldType.array(uniquifyNames(fieldType.getCollectionElementType()))
+            .withNullable(fieldType.getNullable());
       case ITERABLE:
-        return 
FieldType.iterable(uniquifyNames(fieldType.getCollectionElementType()));
+        return 
FieldType.iterable(uniquifyNames(fieldType.getCollectionElementType()))
+            .withNullable(fieldType.getNullable());
       case MAP:
         return FieldType.map(
-            uniquifyNames(fieldType.getMapKeyType()), 
uniquifyNames(fieldType.getMapValueType()));
+                uniquifyNames(fieldType.getMapKeyType()),
+                uniquifyNames(fieldType.getMapValueType()))
+            .withNullable(fieldType.getNullable());
       default:
         return fieldType;
     }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
index f072467..65d6d72 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
 
 import java.util.Arrays;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -37,7 +38,8 @@ public class BeamSqlDslArrayTest {
   private static final Schema INPUT_SCHEMA =
       Schema.builder()
           .addInt32Field("f_int")
-          .addArrayField("f_stringArr", Schema.FieldType.STRING)
+          .addStringField("f_string")
+          .addNullableField("f_stringArr", 
FieldType.array(Schema.FieldType.STRING))
           .build();
 
   @Rule public final TestPipeline pipeline = TestPipeline.create();
@@ -91,6 +93,41 @@ public class BeamSqlDslArrayTest {
   }
 
   @Test
+  public void testProjectArrayFieldWithCoGBKJoin() {
+    PCollection<Row> input = pCollectionOf2Elements();
+
+    Schema resultType =
+        Schema.builder()
+            .addNullableField("f_stringArr", 
FieldType.array(Schema.FieldType.STRING))
+            .build();
+
+    // When we use longer enough IN clause, Calcite calculate physical plan 
with BeamCoGBKJoin.
+    // This SQL push Calcite to use BeamCoGBKJoin for simple select statement.
+    PCollection<Row> result =
+        input.apply(
+            "sqlQuery",
+            SqlTransform.query(
+                "SELECT f_stringArr FROM PCOLLECTION WHERE f_string IN ('A', 
'B', "
+                    + 
"'ABCAABAAAGAG','ABCAABAAAGCB','ABCAABAAAGCJ','ABCAABAAAGEB','ABCAABAAAGEK',"
+                    + 
"'ABCAABAAAGFB','ABCAABAAAGFG','ABCAABABAGBJ','ABCAABABBKIF','ABCAABABCAIK',"
+                    + 
"'ABCAABAEJAAF','ABCAABAEJAED','ABCAABAEJAEE','ABCAABAEJAEF','ABCIABAAAGGJ',"
+                    + 
"'ABCIABAAAGKB','ABCIABAAAJBC','ABCIABAAAJCD','ABCIABAAAJEK','ABCIABAAAJFE',"
+                    + 
"'ABCIABAAAJGE','ABCIABAAAJGF','ABCIABAAAJGG','ABCIABAAAJJK','ABCIABAABAGK',"
+                    + 
"'ABCIABAABAKD','ABCIABAABBDI','ABCIABAABBEI','ABCIABAABFBB','ABCIABAABFBJ',"
+                    + 
"'ABCIABAABFCC','ABCIABAABFDI','ABCIABACFBKF','ABCIABAJAIBG','ABCIABBBDAAC',"
+                    + 
"'ABCIABBFJGAD','ABCIABBGJFDK','ABCIABCAAFBB','ABCIABCAAFJC','ABCIABCACADA',"
+                    + 
"'ABGDABAAGFGA','ABGDABAAGFGF','ABGDABAAGFJG','ABGDABAAGFJK','ABGDABAAGFKJ',"
+                    + "'ABGDABAAGFKI')"));
+
+    PAssert.that(result)
+        .containsInAnyOrder(
+            Row.withSchema(resultType).addArray(Arrays.asList("111", 
"222")).build(),
+            Row.withSchema(resultType).addArray(Arrays.asList("33", "44", 
"55")).build());
+
+    pipeline.run();
+  }
+
+  @Test
   public void testAccessArrayElement() {
     PCollection<Row> input = pCollectionOf2Elements();
 
@@ -109,7 +146,8 @@ public class BeamSqlDslArrayTest {
 
   @Test
   public void testSingleElement() throws Exception {
-    Row inputRow = 
Row.withSchema(INPUT_SCHEMA).addValues(1).addArray(Arrays.asList("111")).build();
+    Row inputRow =
+        Row.withSchema(INPUT_SCHEMA).addValues(1, 
"A").addArray(Arrays.asList("111")).build();
 
     PCollection<Row> input =
         pipeline.apply("boundedInput1", 
Create.of(inputRow).withRowSchema(INPUT_SCHEMA));
@@ -199,12 +237,15 @@ public class BeamSqlDslArrayTest {
   public void testUnnestCrossJoin() {
     Row row1 =
         Row.withSchema(INPUT_SCHEMA)
-            .addValues(42)
+            .addValues(42, "S")
             .addArray(Arrays.asList("111", "222", "333"))
             .build();
 
     Row row2 =
-        
Row.withSchema(INPUT_SCHEMA).addValues(13).addArray(Arrays.asList("444", 
"555")).build();
+        Row.withSchema(INPUT_SCHEMA)
+            .addValues(13, "T")
+            .addArray(Arrays.asList("444", "555"))
+            .build();
 
     PCollection<Row> input =
         pipeline.apply("boundedInput1", Create.of(row1, 
row2).withRowSchema(INPUT_SCHEMA));
@@ -390,11 +431,11 @@ public class BeamSqlDslArrayTest {
         "boundedInput1",
         Create.of(
                 Row.withSchema(INPUT_SCHEMA)
-                    .addValues(1)
+                    .addValues(1, "A")
                     .addArray(Arrays.asList("111", "222"))
                     .build(),
                 Row.withSchema(INPUT_SCHEMA)
-                    .addValues(2)
+                    .addValues(2, "B")
                     .addArray(Arrays.asList("33", "44", "55"))
                     .build())
             .withRowSchema(INPUT_SCHEMA));
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
index b403d83..cbaf56b 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCoGBKJoinRelBoundedVsBoundedTest.java
@@ -17,11 +17,13 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rel;
 
+import java.util.Arrays;
 import org.apache.beam.sdk.extensions.sql.TestUtils;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelMetadataQuery;
 import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -46,6 +48,30 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
               Schema.FieldType.INT32, "price")
           .addRows(1, 2, 3, 2, 3, 3, 3, 4, 5);
 
+  public static final TestBoundedTable ORDER_DETAILS1_WITH_ARRAY =
+      TestBoundedTable.of(
+              Schema.FieldType.INT32,
+              "order_id",
+              Schema.FieldType.INT32,
+              "site_id",
+              Schema.FieldType.INT32,
+              "price",
+              FieldType.array(FieldType.STRING).withNullable(true),
+              "f_stringArr")
+          .addRows(
+              1,
+              2,
+              3,
+              Arrays.asList("111", "222", "333"),
+              2,
+              3,
+              3,
+              Arrays.asList("222", "333", "333"),
+              3,
+              4,
+              5,
+              Arrays.asList("333", "444", "555"));
+
   public static final TestBoundedTable ORDER_DETAILS2 =
       TestBoundedTable.of(
               Schema.FieldType.INT32, "order_id",
@@ -56,6 +82,7 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
   @BeforeClass
   public static void prepare() {
     registerTable("ORDER_DETAILS1", ORDER_DETAILS1);
+    registerTable("ORDER_DETAILS1_WITH_ARRAY", ORDER_DETAILS1_WITH_ARRAY);
     registerTable("ORDER_DETAILS2", ORDER_DETAILS2);
   }
 
@@ -63,7 +90,7 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
   public void testInnerJoin() throws Exception {
     String sql =
         "SELECT *  "
-            + "FROM ORDER_DETAILS1 o1"
+            + "FROM ORDER_DETAILS1_WITH_ARRAY o1"
             + " JOIN ORDER_DETAILS2 o2"
             + " on "
             + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
@@ -76,11 +103,12 @@ public class BeamCoGBKJoinRelBoundedVsBoundedTest extends 
BaseRelTest {
                         .addField("order_id", Schema.FieldType.INT32)
                         .addField("site_id", Schema.FieldType.INT32)
                         .addField("price", Schema.FieldType.INT32)
+                        .addNullableField("f_stringArr", 
FieldType.array(FieldType.STRING))
                         .addField("order_id0", Schema.FieldType.INT32)
                         .addField("site_id0", Schema.FieldType.INT32)
                         .addField("price0", Schema.FieldType.INT32)
                         .build())
-                .addRows(2, 3, 3, 1, 2, 3)
+                .addRows(2, 3, 3, Arrays.asList("222", "333", "333"), 1, 2, 3)
                 .getRows());
     pipeline.run();
   }

Reply via email to