[ 
https://issues.apache.org/jira/browse/BEAM-3785?focusedWorklogId=80706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80706
 ]

ASF GitHub Bot logged work on BEAM-3785:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Mar/18 04:20
            Start Date: 15/Mar/18 04:20
    Worklog Time Spent: 10m 
      Work Description: XuMingmin closed pull request #4857: [BEAM-3785][SQL] 
Add support for arrays of rows
URL: https://github.com/apache/beam/pull/4857
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
index 77eda6a84de..fc16c848db2 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlType.java
@@ -102,9 +102,18 @@ public Builder withTimestampField(String fieldName) {
       return withField(fieldName, SqlTypeCoders.TIMESTAMP);
     }
 
+    /**
+     * Adds an ARRAY field with elements of {@code elementCoder}.
+     */
     public Builder withArrayField(String fieldName, SqlTypeCoder elementCoder) 
{
       return withField(fieldName, SqlTypeCoders.arrayOf(elementCoder));
+    }
 
+    /**
+     * Adds an ARRAY field with elements of {@code rowType}.
+     */
+    public Builder withArrayField(String fieldName, RowType rowType) {
+      return withField(fieldName, SqlTypeCoders.arrayOf(rowType));
     }
 
     public Builder withRowField(String fieldName, RowType rowType) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
index 5b6e104a3fd..c2a6a3ac081 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoder.java
@@ -65,10 +65,6 @@ public int hashCode() {
     return this.getClass().hashCode();
   }
 
-  public static boolean isArray(SqlTypeCoder sqlTypeCoder) {
-    return sqlTypeCoder instanceof SqlArrayCoder;
-  }
-
   static class SqlTinyIntCoder extends SqlTypeCoder {
 
     @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
index 9eea2df1a99..d9a132ca623 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTypeCoders.java
@@ -58,10 +58,6 @@
   public static final SqlTypeCoder DATE = new SqlDateCoder();
   public static final SqlTypeCoder TIMESTAMP = new SqlTimestampCoder();
 
-  public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) {
-    return SqlArrayCoder.of(elementCoder);
-  }
-
   public static final Set<SqlTypeCoder> NUMERIC_TYPES =
       ImmutableSet.of(
           SqlTypeCoders.TINYINT,
@@ -72,6 +68,18 @@ public static SqlTypeCoder arrayOf(SqlTypeCoder 
elementCoder) {
           SqlTypeCoders.DOUBLE,
           SqlTypeCoders.DECIMAL);
 
+  public static SqlTypeCoder arrayOf(SqlTypeCoder elementCoder) {
+    return SqlArrayCoder.of(elementCoder);
+  }
+
+  public static SqlTypeCoder arrayOf(RowType rowType) {
+    return SqlArrayCoder.of(rowOf(rowType));
+  }
+
+  public static boolean isArray(SqlTypeCoder sqlTypeCoder) {
+    return sqlTypeCoder instanceof SqlArrayCoder;
+  }
+
   public static boolean isRow(SqlTypeCoder sqlTypeCoder) {
     return sqlTypeCoder instanceof SqlRowCoder;
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index e81b9278842..7bd04f2b1f5 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -169,8 +169,7 @@ public BeamCalciteTable(RowType beamRowType) {
 
     @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-      return CalciteUtils.toCalciteRowType(this.beamRowType)
-          .apply(BeamQueryPlanner.TYPE_FACTORY);
+      return CalciteUtils.toCalciteRowType(this.beamRowType, 
BeamQueryPlanner.TYPE_FACTORY);
     }
 
     @Override
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 5a47aa460ea..3e7089a71fd 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -202,10 +202,14 @@ static BeamSqlExpression buildExpression(RexNode rexNode) 
{
       ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), 
node.getIndex());
     } else if (rexNode instanceof RexFieldAccess) {
       RexFieldAccess fieldAccessNode = (RexFieldAccess) rexNode;
-      int rowFieldIndex = ((RexInputRef) 
fieldAccessNode.getReferenceExpr()).getIndex();
+      BeamSqlExpression referenceExpression = 
buildExpression(fieldAccessNode.getReferenceExpr());
       int nestedFieldIndex = fieldAccessNode.getField().getIndex();
       SqlTypeName nestedFieldType = 
fieldAccessNode.getField().getType().getSqlTypeName();
-      ret = new BeamSqlFieldAccessExpression(rowFieldIndex, nestedFieldIndex, 
nestedFieldType);
+
+      ret = new BeamSqlFieldAccessExpression(
+          referenceExpression,
+          nestedFieldIndex,
+          nestedFieldType);
     } else if (rexNode instanceof RexCall) {
       RexCall node = (RexCall) rexNode;
       String opName = node.op.getName();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
index 544734a91b0..2efbc7fc71d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -145,8 +145,11 @@ public boolean accept() {
       return true;
     case ARRAY:
       return value instanceof List;
+    case ROW:
+      return value instanceof Row;
     default:
-      throw new UnsupportedOperationException(outputType.name());
+      throw new UnsupportedOperationException(
+          "Unsupported Beam SQL type in expression: " + outputType.name());
     }
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
index 478b4e374e6..50270864145 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpression.java
@@ -15,9 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
 
 import java.util.Collections;
+import java.util.List;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -29,16 +31,16 @@
  */
 public class BeamSqlFieldAccessExpression extends BeamSqlExpression {
 
-  private int rowFieldIndex;
+  private BeamSqlExpression referenceExpression;
   private int nestedFieldIndex;
 
   public BeamSqlFieldAccessExpression(
-      int rowFieldIndex,
+      BeamSqlExpression referenceExpression,
       int nestedFieldIndex,
       SqlTypeName nestedFieldType) {
 
     super(Collections.emptyList(), nestedFieldType);
-    this.rowFieldIndex = rowFieldIndex;
+    this.referenceExpression = referenceExpression;
     this.nestedFieldIndex = nestedFieldIndex;
   }
 
@@ -49,7 +51,22 @@ public boolean accept() {
 
   @Override
   public BeamSqlPrimitive evaluate(Row inputRow, BoundedWindow window) {
-    Row nestedRow = inputRow.getValue(rowFieldIndex);
-    return BeamSqlPrimitive.of(outputType, 
nestedRow.getValue(nestedFieldIndex));
+    BeamSqlPrimitive targetObject = referenceExpression.evaluate(inputRow, 
window);
+    SqlTypeName targetFieldType = targetObject.getOutputType();
+
+    Object targetFieldValue;
+
+    if (SqlTypeName.ARRAY.equals(targetFieldType)) {
+      targetFieldValue = ((List) 
targetObject.getValue()).get(nestedFieldIndex);
+    } else if (SqlTypeName.ROW.equals(targetFieldType)) {
+      targetFieldValue = ((Row) 
targetObject.getValue()).getValue(nestedFieldIndex);
+    } else {
+      throw new IllegalArgumentException(
+          "Attempt to access field of unsupported type "
+          + targetFieldType.getClass().getSimpleName()
+          + ". Field access operator is only supported for arrays or rows");
+    }
+
+    return BeamSqlPrimitive.of(outputType, targetFieldValue);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index eccbed83956..d8e93f9c66f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.beam.sdk.extensions.sql.impl.utils;
 
 import static org.apache.beam.sdk.values.RowType.toRowType;
@@ -30,7 +31,6 @@
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -68,7 +68,7 @@
    * for supported Beam SQL type coder, see {@link SqlTypeCoder}.
    */
   public static SqlTypeName toCalciteType(SqlTypeCoder coder) {
-    if (SqlTypeCoder.isArray(coder)) {
+    if (SqlTypeCoders.isArray(coder)) {
         return SqlTypeName.ARRAY;
     }
 
@@ -135,19 +135,16 @@ public static RowType toBeamRowType(RelDataType 
tableInfo) {
   /**
    * Create an instance of {@code RelDataType} so it can be used to create a 
table.
    */
-  public static RelProtoDataType toCalciteRowType(final RowType rowType) {
-    return dataTypeFactory -> {
-      RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(dataTypeFactory);
-
-      IntStream
-          .range(0, rowType.getFieldCount())
-          .forEach(idx ->
-                       builder.add(
-                           rowType.getFieldName(idx),
-                           toRelDataType(dataTypeFactory, rowType, idx)));
-
-      return builder.build();
-    };
+  public static RelDataType toCalciteRowType(RowType rowType, 
RelDataTypeFactory dataTypeFactory) {
+    RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(dataTypeFactory);
+
+    IntStream
+        .range(0, rowType.getFieldCount())
+        .forEach(idx ->
+                     builder.add(
+                         rowType.getFieldName(idx),
+                         toRelDataType(dataTypeFactory, rowType, idx)));
+    return builder.build();
   }
 
   private static RelDataType toRelDataType(
@@ -163,7 +160,7 @@ private static RelDataType toRelDataType(
     }
 
     if (SqlTypeName.ROW.equals(typeName)) {
-      return createRowRelType(dataTypeFactory, (SqlRowCoder) fieldCoder);
+      return toCalciteRowType(((SqlRowCoder) fieldCoder).getRowType(), 
dataTypeFactory);
     }
 
     return dataTypeFactory.createSqlType(typeName);
@@ -172,18 +169,18 @@ private static RelDataType toRelDataType(
   private static RelDataType createArrayRelType(
       RelDataTypeFactory dataTypeFactory,
       SqlArrayCoder arrayFieldCoder) {
-    SqlTypeName elementType = toCalciteType(arrayFieldCoder.getElementCoder());
-    return
-        dataTypeFactory
-            .createArrayType(
-                dataTypeFactory.createSqlType(elementType), 
UNLIMITED_ARRAY_SIZE);
-  }
 
-  private static RelDataType createRowRelType(
-      RelDataTypeFactory dataTypeFactory,
-      SqlRowCoder rowFieldCoder) {
+    SqlTypeName elementTypeName = 
toCalciteType(arrayFieldCoder.getElementCoder());
+
+    RelDataType elementType;
+
+    if (SqlTypeName.ROW.equals(elementTypeName)) {
+      RowType rowType = ((SqlRowCoder) 
arrayFieldCoder.getElementCoder()).getRowType();
+      elementType = toCalciteRowType(rowType, dataTypeFactory);
+    } else {
+      elementType = dataTypeFactory.createSqlType(elementTypeName);
+    }
 
-    RelProtoDataType relProtoDataType = 
toCalciteRowType(rowFieldCoder.getRowType());
-    return relProtoDataType.apply(dataTypeFactory);
+    return dataTypeFactory.createArrayType(elementType, UNLIMITED_ARRAY_SIZE);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
index 295ec5f83e1..8553c7ce6b7 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlArrayTest.java
@@ -206,6 +206,127 @@ public void testCardinality() {
     pipeline.run();
   }
 
+  @Test
+  public void testSelectRowsFromArrayOfRows() {
+    RowType elementRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_rowString")
+            .withIntegerField("f_rowInt")
+            .build();
+
+    RowType resultRowType =
+        RowSqlType
+            .builder()
+            .withArrayField("f_resultArray", elementRowType)
+            .build();
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withArrayField("f_arrayOfRows", elementRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Arrays.asList(
+                                 
Row.withRowType(elementRowType).addValues("AA", 11).build(),
+                                 
Row.withRowType(elementRowType).addValues("BB", 22).build()))
+                         .build(),
+                      Row.withRowType(inputType)
+                         .addValues(
+                             2,
+                             Arrays.asList(
+                                 
Row.withRowType(elementRowType).addValues("CC", 33).build(),
+                                 
Row.withRowType(elementRowType).addValues("DD", 44).build()))
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT f_arrayOfRows FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert.that(result)
+           .containsInAnyOrder(
+               Row.withRowType(resultRowType)
+                  .addArray(
+                      Arrays.asList(
+                          Row.withRowType(elementRowType).addValues("AA", 
11).build(),
+                          Row.withRowType(elementRowType).addValues("BB", 
22).build()))
+                  .build(),
+               Row.withRowType(resultRowType)
+                  .addArray(
+                      Arrays.asList(
+                          Row.withRowType(elementRowType).addValues("CC", 
33).build(),
+                          Row.withRowType(elementRowType).addValues("DD", 
44).build()))
+                  .build()
+           );
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testSelectSingleRowFromArrayOfRows() {
+    RowType elementRowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_rowString")
+            .withIntegerField("f_rowInt")
+            .build();
+
+    RowType resultRowType = elementRowType;
+
+    RowType inputType =
+        RowSqlType
+            .builder()
+            .withIntegerField("f_int")
+            .withArrayField("f_arrayOfRows", elementRowType)
+            .build();
+
+    PCollection<Row> input =
+        PBegin.in(pipeline)
+              .apply(
+                  Create.of(
+                      Row.withRowType(inputType)
+                         .addValues(
+                             1,
+                             Arrays.asList(
+                                 
Row.withRowType(elementRowType).addValues("AA", 11).build(),
+                                 
Row.withRowType(elementRowType).addValues("BB", 22).build()))
+                         .build(),
+                      Row.withRowType(inputType)
+                         .addValues(
+                             2,
+                             Arrays.asList(
+                                 
Row.withRowType(elementRowType).addValues("CC", 33).build(),
+                                 
Row.withRowType(elementRowType).addValues("DD", 44).build()))
+                         .build())
+                        .withCoder(inputType.getRowCoder()));
+
+    PCollection<Row> result =
+        input
+            .apply(
+                BeamSql.query(
+                    "SELECT f_arrayOfRows[1] FROM PCOLLECTION"))
+            .setCoder(resultRowType.getRowCoder());
+
+    PAssert.that(result)
+           .containsInAnyOrder(
+               Row.withRowType(elementRowType).addValues("BB", 22).build(),
+               Row.withRowType(elementRowType).addValues("DD", 44).build());
+
+    pipeline.run();
+  }
+
   private PCollection<Row> pCollectionOf2Elements() {
     return
         PBegin
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java
new file mode 100644
index 00000000000..ce971c337e7
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/row/BeamSqlFieldAccessExpressionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.RowSqlType;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link BeamSqlFieldAccessExpression}.
+ */
+public class BeamSqlFieldAccessExpressionTest {
+
+  private static final Row NULL_ROW = null;
+  private static final BoundedWindow NULL_WINDOW = null;
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testAccessesFieldOfArray() {
+    BeamSqlPrimitive<List<String>> targetArray =
+        BeamSqlPrimitive.of(
+            SqlTypeName.ARRAY,
+            Arrays.asList("aaa", "bbb", "ccc"));
+
+    BeamSqlFieldAccessExpression arrayExpression =
+        new BeamSqlFieldAccessExpression(targetArray, 1, SqlTypeName.VARCHAR);
+
+    assertEquals("bbb", arrayExpression.evaluate(NULL_ROW, 
NULL_WINDOW).getValue());
+  }
+
+  @Test
+  public void testAccessesFieldOfRow() {
+    RowType rowType =
+        RowSqlType
+            .builder()
+            .withVarcharField("f_string1")
+            .withVarcharField("f_string2")
+            .withVarcharField("f_string3")
+            .build();
+
+    BeamSqlPrimitive<Row> targetRow =
+        BeamSqlPrimitive.of(
+            SqlTypeName.ROW,
+            Row
+                .withRowType(rowType)
+                .addValues("aa", "bb", "cc")
+                .build());
+
+    BeamSqlFieldAccessExpression arrayExpression =
+        new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR);
+
+    assertEquals("bb", arrayExpression.evaluate(NULL_ROW, 
NULL_WINDOW).getValue());
+  }
+
+  @Test
+  public void testThrowsForUnsupportedType() {
+    BeamSqlPrimitive<Integer> targetRow = 
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 5);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("unsupported type");
+
+    new BeamSqlFieldAccessExpression(targetRow, 1, SqlTypeName.VARCHAR)
+        .evaluate(NULL_ROW, NULL_WINDOW).getValue();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 80706)
    Time Spent: 3h 10m  (was: 3h)

> [SQL] Add support for arrays
> ----------------------------
>
>                 Key: BEAM-3785
>                 URL: https://issues.apache.org/jira/browse/BEAM-3785
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> SupportĀ fields of Array type



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to