This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e9bdda9  [BEAM-11834] Enable arrays literals to have null values.
     new 3a0a3b4  Merge pull request #14017 from ibzib/BEAM-11834
e9bdda9 is described below

commit e9bdda9e037600dcda597fd6b105966b0ebeb073
Author: Kyle Weaver <kcwea...@google.com>
AuthorDate: Thu Feb 18 13:09:54 2021 -0800

    [BEAM-11834] Enable arrays literals to have null values.
---
 .../beam/sdk/extensions/sql/impl/utils/CalciteUtils.java |  2 +-
 .../sql/zetasql/ZetaSqlCalciteTranslationUtils.java      |  5 ++++-
 .../extensions/sql/zetasql/ZetaSqlDialectSpecTest.java   | 16 ++++++++++++++++
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index acd6e6c..10ad199 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -234,7 +234,7 @@ public class CalciteUtils {
         return FieldType.row(toSchema(calciteType));
 
       default:
-        return toFieldType(calciteType.getSqlTypeName());
+        return 
toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 203337c..dd59c26 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -290,7 +290,10 @@ public final class ZetaSqlCalciteTranslationUtils {
 
   private static RexNode arrayValueToRexNode(Value value, RexBuilder 
rexBuilder) {
     return rexBuilder.makeCall(
-        toCalciteArrayType(value.getType().asArray().getElementType(), false, 
rexBuilder),
+        toCalciteArrayType(
+            value.getType().asArray().getElementType(),
+            value.getElementList().stream().anyMatch(v -> v.isNull()),
+            rexBuilder),
         SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
         value.getElementList().stream()
             .map(v -> toRexNode(v, rexBuilder))
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index df5959e..4a07989 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -2421,6 +2421,22 @@ public class ZetaSqlDialectSpecTest extends 
ZetaSqlTestBase {
   }
 
   @Test
+  public void testUnnestLiteralWithNullElements() {
+    String sql = "SELECT * FROM UNNEST(ARRAY<STRING>['foo', NULL, 'bar']);";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+    Schema schema = Schema.builder().addNullableField("str_field", 
FieldType.STRING).build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema).addValues("foo").build(),
+            Row.withSchema(schema).addValues((String) null).build(),
+            Row.withSchema(schema).addValues("bar").build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testUNNESTParameters() {
     String sql = "SELECT * FROM UNNEST(@p0);";
     ImmutableMap<String, Value> params =

Reply via email to