robinyqiu commented on a change in pull request #12507:
URL: https://github.com/apache/beam/pull/12507#discussion_r469459270



##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
##########
@@ -17,174 +17,333 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BOOL;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_BYTES;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DATETIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_DOUBLE;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_INT64;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_NUMERIC;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_STRING;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIME;
-import static com.google.zetasql.ZetaSQLType.TypeKind.TYPE_TIMESTAMP;
-import static java.util.stream.Collectors.toList;
-
-import com.google.zetasql.ArrayType;
+import com.google.zetasql.CivilTimeEncoder;
 import com.google.zetasql.StructType;
 import com.google.zetasql.StructType.StructField;
 import com.google.zetasql.Type;
 import com.google.zetasql.TypeFactory;
+import com.google.zetasql.Value;
 import com.google.zetasql.ZetaSQLType.TypeKind;
+import com.google.zetasql.functions.ZetaSQLDateTime.DateTimestampPart;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.List;
-import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
+import org.apache.beam.sdk.extensions.sql.zetasql.translation.SqlOperators;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.ByteString;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnit;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.avatica.util.TimeUnitRange;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.DateString;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimeString;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.TimestampString;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
 /**
  * Utility methods for ZetaSQL <=> Calcite translation.
  *
- * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM, PROTO, 
GEOGRAPHY
+ * <p>Unsupported ZetaSQL types: INT32, UINT32, UINT64, FLOAT, ENUM 
(internal), PROTO, GEOGRAPHY
  */
 @Internal
 public final class ZetaSqlCalciteTranslationUtils {
 
   private ZetaSqlCalciteTranslationUtils() {}
 
+  // TODO[BEAM-9178]: support DateTimestampPart.WEEK and "WEEK with weekday"s
+  private static final ImmutableMap<Integer, TimeUnit> TIME_UNIT_CASTING_MAP =
+      ImmutableMap.<Integer, TimeUnit>builder()
+          .put(DateTimestampPart.YEAR.getNumber(), TimeUnit.YEAR)
+          .put(DateTimestampPart.MONTH.getNumber(), TimeUnit.MONTH)
+          .put(DateTimestampPart.DAY.getNumber(), TimeUnit.DAY)
+          .put(DateTimestampPart.DAYOFWEEK.getNumber(), TimeUnit.DOW)
+          .put(DateTimestampPart.DAYOFYEAR.getNumber(), TimeUnit.DOY)
+          .put(DateTimestampPart.QUARTER.getNumber(), TimeUnit.QUARTER)
+          .put(DateTimestampPart.HOUR.getNumber(), TimeUnit.HOUR)
+          .put(DateTimestampPart.MINUTE.getNumber(), TimeUnit.MINUTE)
+          .put(DateTimestampPart.SECOND.getNumber(), TimeUnit.SECOND)
+          .put(DateTimestampPart.MILLISECOND.getNumber(), TimeUnit.MILLISECOND)
+          .put(DateTimestampPart.MICROSECOND.getNumber(), TimeUnit.MICROSECOND)
+          .put(DateTimestampPart.ISOYEAR.getNumber(), TimeUnit.ISOYEAR)
+          .put(DateTimestampPart.ISOWEEK.getNumber(), TimeUnit.WEEK)
+          .build();
+
   // Type conversion: Calcite => ZetaSQL
-  public static Type toZetaType(RelDataType calciteType) {
+  public static Type toZetaSqlType(RelDataType calciteType) {
     switch (calciteType.getSqlTypeName()) {
       case BIGINT:
-        return TypeFactory.createSimpleType(TYPE_INT64);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_INT64);
       case DOUBLE:
-        return TypeFactory.createSimpleType(TYPE_DOUBLE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DOUBLE);
       case BOOLEAN:
-        return TypeFactory.createSimpleType(TYPE_BOOL);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BOOL);
       case VARCHAR:
-        return TypeFactory.createSimpleType(TYPE_STRING);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_STRING);
       case VARBINARY:
-        return TypeFactory.createSimpleType(TYPE_BYTES);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_BYTES);
       case DECIMAL:
-        return TypeFactory.createSimpleType(TYPE_NUMERIC);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_NUMERIC);
       case DATE:
-        return TypeFactory.createSimpleType(TYPE_DATE);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATE);
       case TIME:
-        return TypeFactory.createSimpleType(TYPE_TIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIME);
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return TypeFactory.createSimpleType(TYPE_DATETIME);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_DATETIME);
       case TIMESTAMP:
-        return TypeFactory.createSimpleType(TYPE_TIMESTAMP);
+        return TypeFactory.createSimpleType(TypeKind.TYPE_TIMESTAMP);
       case ARRAY:
-        return 
TypeFactory.createArrayType(toZetaType(calciteType.getComponentType()));
+        return 
TypeFactory.createArrayType(toZetaSqlType(calciteType.getComponentType()));
       case ROW:
-        List<StructField> structFields =
+        return TypeFactory.createStructType(
             calciteType.getFieldList().stream()
-                .map(f -> new StructField(f.getName(), 
toZetaType(f.getType())))
-                .collect(toList());
-
-        return TypeFactory.createStructType(structFields);
+                .map(f -> new StructField(f.getName(), 
toZetaSqlType(f.getType())))
+                .collect(Collectors.toList()));
       default:
-        throw new UnsupportedOperationException("Unsupported RelDataType: " + 
calciteType);
+        throw new UnsupportedOperationException(
+            "Unknown Calcite type: " + calciteType.getSqlTypeName().getName());
     }
   }
 
   // Type conversion: ZetaSQL => Calcite
-  public static SqlTypeName toCalciteTypeName(TypeKind type) {
-    switch (type) {
+  public static RelDataType toCalciteType(Type type, boolean nullable, 
RexBuilder rexBuilder) {
+    RelDataType nonNullType;
+    switch (type.getKind()) {
       case TYPE_INT64:
-        return SqlTypeName.BIGINT;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
+        break;
       case TYPE_DOUBLE:
-        return SqlTypeName.DOUBLE;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+        break;
       case TYPE_BOOL:
-        return SqlTypeName.BOOLEAN;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BOOLEAN);
+        break;
       case TYPE_STRING:
-        return SqlTypeName.VARCHAR;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+        break;
       case TYPE_BYTES:
-        return SqlTypeName.VARBINARY;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARBINARY);
+        break;
       case TYPE_NUMERIC:
-        return SqlTypeName.DECIMAL;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DECIMAL);
+        break;
       case TYPE_DATE:
-        return SqlTypeName.DATE;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.DATE);
+        break;
       case TYPE_TIME:
-        return SqlTypeName.TIME;
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIME);
+        break;
       case TYPE_DATETIME:
-        return SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
+        nonNullType =
+            
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+        break;
       case TYPE_TIMESTAMP:
-        // TODO: handle timestamp with time zone.
-        return SqlTypeName.TIMESTAMP;
-        // TODO[BEAM-9179] Add conversion code for ARRAY and ROW types
+        nonNullType = 
rexBuilder.getTypeFactory().createSqlType(SqlTypeName.TIMESTAMP);
+        break;
+      case TYPE_ARRAY:
+        // TODO: Should element type has the same nullability as the array 
type?
+        nonNullType = toCalciteArrayType(type.asArray().getElementType(), 
nullable, rexBuilder);
+        break;
+      case TYPE_STRUCT:
+        // TODO: Should field type has the same nullability as the struct type?
+        nonNullType = toCalciteStructType(type.asStruct(), nullable, 
rexBuilder);
+        break;
       default:
-        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + 
type.name());
+        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + 
type.getKind().name());
     }
+    return rexBuilder.getTypeFactory().createTypeWithNullability(nonNullType, 
nullable);
   }
 
-  public static RelDataType toRelDataType(RexBuilder rexBuilder, Type type, 
boolean isNullable) {
-    if (type.getKind().equals(TypeKind.TYPE_ARRAY)) {
-      return toArrayRelDataType(rexBuilder, type.asArray(), isNullable);
-    } else if (type.getKind().equals(TypeKind.TYPE_STRUCT)) {
-      return toStructRelDataType(rexBuilder, type.asStruct(), isNullable);
-    } else {
-      // TODO: Check type's nullability?
-      return toSimpleRelDataType(type.getKind(), rexBuilder, isNullable);
-    }
+  private static RelDataType toCalciteArrayType(
+      Type elementType, boolean nullable, RexBuilder rexBuilder) {
+    return rexBuilder
+        .getTypeFactory()
+        // -1 cardinality means unlimited array size
+        .createArrayType(toCalciteType(elementType, nullable, rexBuilder), -1);
   }
 
-  public static RelDataType toArrayRelDataType(
-      RexBuilder rexBuilder, ArrayType arrayType, boolean isNullable) {
-    // -1 cardinality means unlimited array size.
-    // TODO: is unlimited array size right for general case?
-    // TODO: whether isNullable should be ArrayType's nullablity (not its 
element type's?)
-    return nullable(
-        rexBuilder,
-        rexBuilder
-            .getTypeFactory()
-            .createArrayType(toRelDataType(rexBuilder, 
arrayType.getElementType(), isNullable), -1),
-        isNullable);
+  private static RelDataType toCalciteStructType(
+      StructType structType, boolean nullable, RexBuilder rexBuilder) {
+    List<StructField> fields = structType.getFieldList();
+    List<String> fieldNames = getFieldNameList(fields);
+    List<RelDataType> fieldTypes =
+        fields.stream()
+            .map(f -> toCalciteType(f.getType(), nullable, rexBuilder))
+            .collect(Collectors.toList());
+    return rexBuilder.getTypeFactory().createStructType(fieldTypes, 
fieldNames);
   }
 
-  private static List<String> toNameList(List<StructField> fields) {
+  private static List<String> getFieldNameList(List<StructField> fields) {
     ImmutableList.Builder<String> b = ImmutableList.builder();
     for (int i = 0; i < fields.size(); i++) {
       String name = fields.get(i).getName();
       if ("".equals(name)) {
-        name = "$col" + i;
+        name = "$col" + i; // empty field name is not allowed, generate an 
index-based name for it
       }
       b.add(name);
     }
     return b.build();
   }
 
-  public static RelDataType toStructRelDataType(
-      RexBuilder rexBuilder, StructType structType, boolean isNullable) {
+  // Value conversion: ZetaSQL => Calcite
+  public static RexNode toRexNode(Value value, RexBuilder rexBuilder) {
+    Type type = value.getType();
+    if (value.isNull()) {
+      return rexBuilder.makeNullLiteral(toCalciteType(type, true, rexBuilder));
+    }
 
-    List<StructField> fields = structType.getFieldList();
-    List<String> fieldNames = toNameList(fields);
-    List<RelDataType> fieldTypes =
-        fields.stream()
-            .map(f -> toRelDataType(rexBuilder, f.getType(), isNullable))
-            .collect(toList());
+    switch (type.getKind()) {
+      case TYPE_INT64:
+        return rexBuilder.makeExactLiteral(
+            new BigDecimal(value.getInt64Value()), toCalciteType(type, false, 
rexBuilder));
+      case TYPE_DOUBLE:
+        // Cannot simply call makeApproxLiteral() for ZetaSQL DOUBLE type 
because positive infinity,
+        // negative infinity and NaN cannot be directly converted to 
BigDecimal. So we create three
+        // wrapper functions here for these three cases such that we can later 
recognize it and
+        // customize its unparsing in BeamBigQuerySqlDialect.
+        double val = value.getDoubleValue();
+        String wrapperFun = null;
+        if (val == Double.POSITIVE_INFINITY) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_POSITIVE_INF_FUNCTION;
+        } else if (val == Double.NEGATIVE_INFINITY) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NEGATIVE_INF_FUNCTION;
+        } else if (Double.isNaN(val)) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION;
+        }
 
-    return rexBuilder.getTypeFactory().createStructType(fieldTypes, 
fieldNames);
+        RelDataType returnType = toCalciteType(type, false, rexBuilder);
+        if (wrapperFun == null) {
+          return rexBuilder.makeApproxLiteral(new BigDecimal(val), returnType);
+        } else if 
(BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION.equals(wrapperFun)) {
+          // TODO[BEAM-10550]: Update the temporary workaround below after 
vendored Calcite version.
+          // Adding an additional random parameter for the wrapper function of 
NaN, to avoid
+          // triggering Calcite operation simplification. (e.g. 'NaN == NaN' 
would be simplify to
+          // 'null or NaN is not null' in Calcite. This would miscalculate the 
expression to be
+          // true, which should be false.)
+          return rexBuilder.makeCall(
+              SqlOperators.createZetaSqlFunction(wrapperFun, 
returnType.getSqlTypeName()),
+              rexBuilder.makeApproxLiteral(BigDecimal.valueOf(Math.random()), 
returnType));
+        } else {
+          return rexBuilder.makeCall(
+              SqlOperators.createZetaSqlFunction(wrapperFun, 
returnType.getSqlTypeName()));
+        }
+      case TYPE_BOOL:
+        return rexBuilder.makeLiteral(value.getBoolValue());
+      case TYPE_STRING:
+        // Has to allow CAST because Calcite create CHAR type first and does a 
CAST to VARCHAR.
+        // If not allow cast, rexBuilder() will only build a literal with CHAR 
type.
+        return rexBuilder.makeLiteral(
+            value.getStringValue(), toCalciteType(type, false, rexBuilder), 
true);
+      case TYPE_BYTES:
+        return rexBuilder.makeBinaryLiteral(new 
ByteString(value.getBytesValue().toByteArray()));
+      case TYPE_NUMERIC:
+        // Cannot simply call makeExactLiteral() for ZetaSQL NUMERIC type 
because later it will be
+        // unparsed to the string representation of the BigDecimal itself 
(e.g. "SELECT NUMERIC '0'"
+        // will be unparsed to "SELECT 0E-9"), and Calcite does not allow 
customize unparsing of
+        // SqlNumericLiteral. So we create a wrapper function here such that 
we can later recognize
+        // it and customize its unparsing in BeamBigQuerySqlDialect.
+        return rexBuilder.makeCall(
+            SqlOperators.createZetaSqlFunction(
+                BeamBigQuerySqlDialect.NUMERIC_LITERAL_FUNCTION,
+                toCalciteType(type, false, rexBuilder).getSqlTypeName()),
+            rexBuilder.makeExactLiteral(
+                value.getNumericValue(), toCalciteType(type, false, 
rexBuilder)));
+      case TYPE_DATE:
+        return rexBuilder.makeDateLiteral(dateValueToDateString(value));
+      case TYPE_TIME:
+        return rexBuilder.makeTimeLiteral(
+            timeValueToTimeString(value),
+            
rexBuilder.getTypeFactory().getTypeSystem().getMaxPrecision(SqlTypeName.TIME));
+      case TYPE_DATETIME:
+        return rexBuilder.makeTimestampWithLocalTimeZoneLiteral(
+            datetimeValueToTimestampString(value),
+            rexBuilder
+                .getTypeFactory()
+                .getTypeSystem()
+                .getMaxPrecision(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE));
+      case TYPE_TIMESTAMP:
+        return rexBuilder.makeTimestampLiteral(
+            timestampValueToTimestampString(value),
+            
rexBuilder.getTypeFactory().getTypeSystem().getMaxPrecision(SqlTypeName.TIMESTAMP));
+      case TYPE_ARRAY:
+        return arrayValueToRexNode(value, rexBuilder);
+      case TYPE_STRUCT:
+        return structValueToRexNode(value, rexBuilder);
+      case TYPE_ENUM: // internal only, used for DateTimestampPart
+        return enumValueToRexNode(value, rexBuilder);
+      default:
+        throw new UnsupportedOperationException("Unknown ZetaSQL type: " + 
type.getKind().name());
+    }
+  }
+
+  private static RexNode arrayValueToRexNode(Value value, RexBuilder 
rexBuilder) {
+    return rexBuilder.makeCall(
+        toCalciteArrayType(value.getType().asArray().getElementType(), false, 
rexBuilder),
+        SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR,
+        value.getElementList().stream()
+            .map(v -> toRexNode(v, rexBuilder))
+            .collect(Collectors.toList()));
+  }
+
+  private static RexNode structValueToRexNode(Value value, RexBuilder 
rexBuilder) {
+    return rexBuilder.makeCall(
+        toCalciteStructType(value.getType().asStruct(), false, rexBuilder),

Review comment:
       This LOC specified the return STRUCT type explicitly, so Calcite does 
not need to infer. Adding this line introduced a bug, because later when the 
schema is translated to Beam Schema, duplicate field names will be caught. 
https://github.com/apache/beam/pull/12550 fixes this bug.
   
   @apilloud 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to