Abacn commented on code in PR #32482:
URL: https://github.com/apache/beam/pull/32482#discussion_r1767334140


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java:
##########
@@ -219,94 +187,270 @@ public void testConvertGenericRecordToTableRow() throws 
Exception {
     }
   }
 
+  @Test
+  public void testConvertBigQuerySchemaToAvroSchemaDisabledLogicalTypes() {
+    TableSchema tableSchema = new TableSchema();
+    tableSchema.setFields(fields);
+    Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, 
false);
+
+    assertThat(avroSchema.getField("number").schema(), 
equalTo(SchemaBuilder.builder().longType()));
+    assertThat(
+        avroSchema.getField("species").schema(),
+        
equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion()));
+    assertThat(
+        avroSchema.getField("quality").schema(),
+        
equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion()));
+    assertThat(
+        avroSchema.getField("quantity").schema(),
+        
equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion()));
+    assertThat(
+        avroSchema.getField("birthday").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .type(
+                    
LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()))
+                .endUnion()));
+    assertThat(
+        avroSchema.getField("birthdayMoney").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .type(LogicalTypes.decimal(38, 
9).addToSchema(SchemaBuilder.builder().bytesType()))
+                .endUnion()));
+    assertThat(
+        avroSchema.getField("lotteryWinnings").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .type(LogicalTypes.decimal(77, 
38).addToSchema(SchemaBuilder.builder().bytesType()))
+                .endUnion()));
+    assertThat(
+        avroSchema.getField("flighted").schema(),
+        
equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion()));
+    assertThat(
+        avroSchema.getField("sound").schema(),
+        
equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion()));
+    assertThat(
+        avroSchema.getField("anniversaryDate").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .stringBuilder()
+                .prop("sqlType", "DATE")
+                .endString()
+                .endUnion()));
+    assertThat(
+        avroSchema.getField("anniversaryDatetime").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .stringBuilder()
+                .prop("sqlType", "DATETIME")
+                .endString()
+                .endUnion()));
+    assertThat(
+        avroSchema.getField("anniversaryTime").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .stringBuilder()
+                .prop("sqlType", "TIME")
+                .endString()
+                .endUnion()));
+    assertThat(
+        avroSchema.getField("geoPositions").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .stringBuilder()
+                .prop("sqlType", "GEOGRAPHY")
+                .endString()
+                .endUnion()));
+    assertThat(
+        avroSchema.getField("scion").schema(),
+        equalTo(
+            SchemaBuilder.builder()
+                .unionOf()
+                .nullType()
+                .and()
+                .record("scion")
+                .doc("Translated Avro Schema for scion")
+                .namespace("org.apache.beam.sdk.io.gcp.bigquery")
+                .fields()
+                .name("species")
+                .type()
+                .unionOf()
+                .nullType()
+                .and()
+                .stringType()
+                .endUnion()
+                .noDefault()
+                .endRecord()
+                .endUnion()));
+
+    assertThat(
+        avroSchema.getField("associates").schema(),
+        equalTo(
+            SchemaBuilder.array()
+                .items()
+                .record("associates")
+                .doc("Translated Avro Schema for associates")
+                .namespace("org.apache.beam.sdk.io.gcp.bigquery")
+                .fields()
+                .name("species")
+                .type()
+                .unionOf()
+                .nullType()
+                .and()
+                .stringType()
+                .endUnion()
+                .noDefault()
+                .endRecord()));
+  }
+
   @Test
   public void testConvertBigQuerySchemaToAvroSchema() {
     TableSchema tableSchema = new TableSchema();
     tableSchema.setFields(fields);
-    Schema avroSchema =
-        BigQueryAvroUtils.toGenericAvroSchema("testSchema", 
tableSchema.getFields());
+    Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema);
 
-    assertThat(avroSchema.getField("number").schema(), 
equalTo(Schema.create(Type.LONG)));

Review Comment:
   Some changes aren't necessary? Tested that the original assert still pass. 
Consider only change assert whenever necessary, then what has been changed due 
to this fix becomes more obvious.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField(
     @SuppressWarnings("unchecked")
     List<Object> elements = (List<Object>) v;
     ArrayList<Object> values = new ArrayList<>();
-    Type elementType = schema.getElementType().getType();
-    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
+      values.add(convertRequiredField(name, elementType, element));
     }
     return values;
   }
 
-  private static Object convertRequiredField(
-      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
+  private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
-    verify(
-        expectedAvroTypes.contains(avroType),
-        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
-        expectedAvroTypes,
-        bqType,
-        fieldSchema.getName(),
-        avroType);
+    checkNotNull(v, "REQUIRED field %s should not be null", name);
+
     // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
     // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (bqType) {
-      case "STRING":
-      case "DATETIME":
-      case "GEOGRAPHY":
-      case "JSON":
-        // Avro will use a CharSequence to represent String objects, but it 
may not always use
-        // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
-        return v.toString();
-      case "DATE":
-        if (avroType == Type.INT) {
-          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
-          verifyNotNull(avroLogicalType, "Expected Date logical type");
-          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+    Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    switch (type) {
+      case BOOLEAN:
+        // SQL types BOOL, BOOLEAN
+        return v;
+      case INT:
+        if (logicalType instanceof LogicalTypes.Date) {
+          // SQL types DATE
           return formatDate((Integer) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Unexpected BigQuery field schema type %s for field named 
%s", type, name));
         }
-      case "TIME":
-        if (avroType == Type.LONG) {
-          verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-          verifyNotNull(avroLogicalType, "Expected TimeMicros logical type");
-          verify(
-              avroLogicalType instanceof LogicalTypes.TimeMicros,
-              "Expected TimeMicros logical type");
+      case LONG:
+        if (logicalType instanceof LogicalTypes.TimeMicros) {
+          // SQL types TIME
           return formatTime((Long) v);
+        } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+          // SQL types TIMESTAMP
+          return formatTimestamp((Long) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
+          return ((Long) v).toString();

Review Comment:
   why need to convert to string here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -446,10 +456,27 @@ public static Schema fromTableSchema(TableSchema 
tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  /** Convert a list of BigQuery {@link TableSchema} to Avro {@link 
org.apache.avro.Schema}. */
+  public static org.apache.avro.Schema toGenericAvroSchema(TableSchema 
tableSchema) {
+    return toGenericAvroSchema(tableSchema, false);
+  }
+
+  /** Convert a list of BigQuery {@link TableSchema} to Avro {@link 
org.apache.avro.Schema}. */
+  public static org.apache.avro.Schema toGenericAvroSchema(
+      TableSchema tableSchema, Boolean stringLogicalTypes) {
+    return toGenericAvroSchema("root", tableSchema.getFields(), 
stringLogicalTypes);
+  }
+
   /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link 
org.apache.avro.Schema}. */
   public static org.apache.avro.Schema toGenericAvroSchema(
       String schemaName, List<TableFieldSchema> fieldSchemas) {
-    return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas);
+    return toGenericAvroSchema(schemaName, fieldSchemas, false);
+  }
+
+  /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link 
org.apache.avro.Schema}. */
+  public static org.apache.avro.Schema toGenericAvroSchema(
+      String schemaName, List<TableFieldSchema> fieldSchemas, Boolean 
stringLogicalTypes) {
+    return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, 
stringLogicalTypes);

Review Comment:
   Consider sync the parameter name `stringLogicalTypes` -> 
`useAvroLogicalTypes` ?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -64,35 +58,95 @@
  */
 class BigQueryAvroUtils {
 
+  static class DateTimeLogicalType extends LogicalType {
+    public DateTimeLogicalType() {
+      super("datetime");
+    }
+  }
+
+  static final DateTimeLogicalType DATETIME_LOGICAL_TYPE = new 
DateTimeLogicalType();
+
   /**
    * Defines the valid mapping between BigQuery types and native Avro types.
    *
-   * <p>Some BigQuery types are duplicated here since slightly different Avro 
records are produced
-   * when exporting data in Avro format and when reading data directly using 
the read API.
+   * @see <a 
href=https://cloud.google.com/bigquery/docs/exporting-data#avro_export_details>BQ
 avro
+   *     export</a>
+   * @see <a 
href=https://cloud.google.com/bigquery/docs/reference/storage#avro_schema_details>BQ
+   *     avro storage</a>
    */
-  static final ImmutableMultimap<String, Type> BIG_QUERY_TO_AVRO_TYPES =
-      ImmutableMultimap.<String, Type>builder()
-          .put("STRING", Type.STRING)
-          .put("GEOGRAPHY", Type.STRING)
-          .put("BYTES", Type.BYTES)
-          .put("INTEGER", Type.LONG)
-          .put("INT64", Type.LONG)
-          .put("FLOAT", Type.DOUBLE)
-          .put("FLOAT64", Type.DOUBLE)
-          .put("NUMERIC", Type.BYTES)
-          .put("BIGNUMERIC", Type.BYTES)
-          .put("BOOLEAN", Type.BOOLEAN)
-          .put("BOOL", Type.BOOLEAN)
-          .put("TIMESTAMP", Type.LONG)
-          .put("RECORD", Type.RECORD)
-          .put("STRUCT", Type.RECORD)
-          .put("DATE", Type.STRING)
-          .put("DATE", Type.INT)
-          .put("DATETIME", Type.STRING)
-          .put("TIME", Type.STRING)
-          .put("TIME", Type.LONG)
-          .put("JSON", Type.STRING)
-          .build();
+  static Schema getPrimitiveType(TableFieldSchema schema, Boolean 
useAvroLogicalTypes) {
+    String bqType = schema.getType();
+    switch (bqType) {
+      case "BOOL":
+      case "BOOLEAN":
+        // boolean
+        return SchemaBuilder.builder().booleanType();
+      case "BYTES":
+        // bytes
+        return SchemaBuilder.builder().bytesType();
+      case "FLOAT64":
+      case "FLOAT": // even if not a valid BQ type, it is used in the schema
+        // double
+        return SchemaBuilder.builder().doubleType();
+      case "INT64":
+      case "INT":
+      case "SMALLINT":
+      case "INTEGER":
+      case "BIGINT":
+      case "TINYINT":
+      case "BYTEINT":
+        // long
+        return SchemaBuilder.builder().longType();
+      case "STRING":
+        // string
+        return SchemaBuilder.builder().stringType();
+      case "NUMERIC":
+      case "BIGNUMERIC":
+        // decimal
+        LogicalType logicalType;
+        if (schema.getScale() != null) {
+          logicalType =
+              LogicalTypes.decimal(schema.getPrecision().intValue(), 
schema.getScale().intValue());
+        } else if (schema.getPrecision() != null) {
+          logicalType = LogicalTypes.decimal(schema.getPrecision().intValue());
+        } else if (bqType.equals("NUMERIC")) {
+          logicalType = LogicalTypes.decimal(38, 9);
+        } else {

Review Comment:
   add a comment `// BIGNUMERIC`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -322,15 +322,21 @@ private static FieldType fromTableFieldSchemaType(
       case "BYTES":
         return FieldType.BYTES;
       case "INT64":
+      case "INT":
+      case "SMALLINT":
       case "INTEGER":
+      case "BIGINT":
+      case "TINYINT":
+      case "BYTEINT":

Review Comment:
   There is FieldType.INT32, FieldType.INT16, FieldType.BYTE, shall we do a 
more precise map or is there additional risk?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -64,35 +58,95 @@
  */
 class BigQueryAvroUtils {
 
+  static class DateTimeLogicalType extends LogicalType {

Review Comment:
   There are avro and Beam logical type with same class names, consider noting 
   ```
   // org.apache.avro.LogicalType
   ```
   for readability.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -182,30 +179,17 @@ public List<BigQueryStorageStreamSource<T>> split(
       LOG.info("Read session returned {} streams", 
readSession.getStreamsList().size());
     }
 
-    Schema sessionSchema;
-    if (readSession.getDataFormat() == DataFormat.ARROW) {

Review Comment:
   what you mind explaining a little bit what kind of improvement made 
ARROW/AVRO schemas no longer needed here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1275,8 +1272,12 @@ public PCollection<T> expand(PBegin input) {
 
       Schema beamSchema = null;
       if (getTypeDescriptor() != null && getToBeamRowFn() != null && 
getFromBeamRowFn() != null) {
-        beamSchema = sourceDef.getBeamSchema(bqOptions);
-        beamSchema = getFinalSchema(beamSchema, getSelectedFields());
+        TableSchema tableSchema = sourceDef.getTableSchema(bqOptions);
+        ValueProvider<List<String>> selectedFields = getSelectedFields();
+        if (selectedFields != null) {
+          tableSchema = BigQueryUtils.trimSchema(tableSchema, 
selectedFields.get());

Review Comment:
   We still need to check `selectedFields.isAccessible()` (as the original 
getFinalSchema) as this is done in pipeline expansion time



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java:
##########
@@ -45,6 +45,15 @@ <T> BigQuerySourceBase<T> toSource(
       SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> 
readerFactory,
       boolean useAvroLogicalTypes);
 
+  /**
+   * Extract the {@link TableSchema} corresponding to this source.
+   *
+   * @param bqOptions BigQueryOptions
+   * @return table schema of the source
+   * @throws BigQuerySchemaRetrievalException if schema retrieval fails
+   */
+  TableSchema getTableSchema(BigQueryOptions bqOptions);
+

Review Comment:
   Now getBeamSchema is trivial and the impl in its implementation classes are 
the same. Consider make a default implementation here?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java:
##########
@@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField(
     @SuppressWarnings("unchecked")
     List<Object> elements = (List<Object>) v;
     ArrayList<Object> values = new ArrayList<>();
-    Type elementType = schema.getElementType().getType();
-    LogicalType elementLogicalType = schema.getElementType().getLogicalType();
     for (Object element : elements) {
-      values.add(convertRequiredField(elementType, elementLogicalType, 
fieldSchema, element));
+      values.add(convertRequiredField(name, elementType, element));
     }
     return values;
   }
 
-  private static Object convertRequiredField(
-      Type avroType, LogicalType avroLogicalType, TableFieldSchema 
fieldSchema, Object v) {
+  private static Object convertRequiredField(String name, Schema schema, 
Object v) {
     // REQUIRED fields are represented as the corresponding Avro types. For 
example, a BigQuery
     // INTEGER type maps to an Avro LONG type.
-    checkNotNull(v, "REQUIRED field %s should not be null", 
fieldSchema.getName());
-    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, 
the type field
-    // is required, so it may not be null.
-    String bqType = fieldSchema.getType();
-    ImmutableCollection<Type> expectedAvroTypes = 
BIG_QUERY_TO_AVRO_TYPES.get(bqType);
-    verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType);
-    verify(
-        expectedAvroTypes.contains(avroType),
-        "Expected Avro schema types %s for BigQuery %s field %s, but received 
%s",
-        expectedAvroTypes,
-        bqType,
-        fieldSchema.getName(),
-        avroType);
+    checkNotNull(v, "REQUIRED field %s should not be null", name);
+
     // For historical reasons, don't validate avroLogicalType except for with 
NUMERIC.
     // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL 
logical type.
-    switch (bqType) {
-      case "STRING":
-      case "DATETIME":
-      case "GEOGRAPHY":
-      case "JSON":
-        // Avro will use a CharSequence to represent String objects, but it 
may not always use
-        // java.lang.String; for example, it may prefer 
org.apache.avro.util.Utf8.
-        verify(v instanceof CharSequence, "Expected CharSequence (String), got 
%s", v.getClass());
-        return v.toString();
-      case "DATE":
-        if (avroType == Type.INT) {
-          verify(v instanceof Integer, "Expected Integer, got %s", 
v.getClass());
-          verifyNotNull(avroLogicalType, "Expected Date logical type");
-          verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date 
logical type");
+    Type type = schema.getType();
+    LogicalType logicalType = schema.getLogicalType();
+    switch (type) {
+      case BOOLEAN:
+        // SQL types BOOL, BOOLEAN
+        return v;
+      case INT:
+        if (logicalType instanceof LogicalTypes.Date) {
+          // SQL types DATE
           return formatDate((Integer) v);
         } else {
-          verify(v instanceof CharSequence, "Expected CharSequence (String), 
got %s", v.getClass());
-          return v.toString();
+          throw new UnsupportedOperationException(

Review Comment:
   shall we fall back to normal integer (or at least fall back when 
schema.getLogicalType is null), as what LONG did in the following?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -193,8 +193,7 @@ public void teardown() {
   @Test
   public void testBuildTableBasedSource() {
     BigQueryIO.TypedRead<TableRow> typedRead =
-        BigQueryIO.read(new TableRowParser())

Review Comment:
   same, test still pass on old config. We can preserve the old test code (or 
just update some of them) so it still covers different config settings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to