Abacn commented on code in PR #32482: URL: https://github.com/apache/beam/pull/32482#discussion_r1767334140
########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java: ########## @@ -219,94 +187,270 @@ public void testConvertGenericRecordToTableRow() throws Exception { } } + @Test + public void testConvertBigQuerySchemaToAvroSchemaDisabledLogicalTypes() { + TableSchema tableSchema = new TableSchema(); + tableSchema.setFields(fields); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema, false); + + assertThat(avroSchema.getField("number").schema(), equalTo(SchemaBuilder.builder().longType())); + assertThat( + avroSchema.getField("species").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().stringType().endUnion())); + assertThat( + avroSchema.getField("quality").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().doubleType().endUnion())); + assertThat( + avroSchema.getField("quantity").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().longType().endUnion())); + assertThat( + avroSchema.getField("birthday").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type( + LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType())) + .endUnion())); + assertThat( + avroSchema.getField("birthdayMoney").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.decimal(38, 9).addToSchema(SchemaBuilder.builder().bytesType())) + .endUnion())); + assertThat( + avroSchema.getField("lotteryWinnings").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .type(LogicalTypes.decimal(77, 38).addToSchema(SchemaBuilder.builder().bytesType())) + .endUnion())); + assertThat( + avroSchema.getField("flighted").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().booleanType().endUnion())); + assertThat( + avroSchema.getField("sound").schema(), + equalTo(SchemaBuilder.builder().unionOf().nullType().and().bytesType().endUnion())); + assertThat( + avroSchema.getField("anniversaryDate").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "DATE") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("anniversaryDatetime").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "DATETIME") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("anniversaryTime").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "TIME") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("geoPositions").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .stringBuilder() + .prop("sqlType", "GEOGRAPHY") + .endString() + .endUnion())); + assertThat( + avroSchema.getField("scion").schema(), + equalTo( + SchemaBuilder.builder() + .unionOf() + .nullType() + .and() + .record("scion") + .doc("Translated Avro Schema for scion") + .namespace("org.apache.beam.sdk.io.gcp.bigquery") + .fields() + .name("species") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord() + .endUnion())); + + assertThat( + avroSchema.getField("associates").schema(), + equalTo( + SchemaBuilder.array() + .items() + .record("associates") + .doc("Translated Avro Schema for associates") + .namespace("org.apache.beam.sdk.io.gcp.bigquery") + .fields() + .name("species") + .type() + .unionOf() + .nullType() + .and() + .stringType() + .endUnion() + .noDefault() + .endRecord())); + } + @Test public void testConvertBigQuerySchemaToAvroSchema() { TableSchema tableSchema = new TableSchema(); tableSchema.setFields(fields); - Schema avroSchema = - BigQueryAvroUtils.toGenericAvroSchema("testSchema", tableSchema.getFields()); + Schema avroSchema = BigQueryAvroUtils.toGenericAvroSchema(tableSchema); - assertThat(avroSchema.getField("number").schema(), equalTo(Schema.create(Type.LONG))); Review Comment: Some changes aren't necessary? Tested that the original assert still pass. Consider only change assert whenever necessary, then what has been changed due to this fix becomes more obvious. ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField( @SuppressWarnings("unchecked") List<Object> elements = (List<Object>) v; ArrayList<Object> values = new ArrayList<>(); - Type elementType = schema.getElementType().getType(); - LogicalType elementLogicalType = schema.getElementType().getLogicalType(); for (Object element : elements) { - values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element)); + values.add(convertRequiredField(name, elementType, element)); } return values; } - private static Object convertRequiredField( - Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) { + private static Object convertRequiredField(String name, Schema schema, Object v) { // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); - verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); - verify( - expectedAvroTypes.contains(avroType), - "Expected Avro schema types %s for BigQuery %s field %s, but received %s", - expectedAvroTypes, - bqType, - fieldSchema.getName(), - avroType); + checkNotNull(v, "REQUIRED field %s should not be null", name); + // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. - switch (bqType) { - case "STRING": - case "DATETIME": - case "GEOGRAPHY": - case "JSON": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "DATE": - if (avroType == Type.INT) { - verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Date logical type"); - verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + switch (type) { + case BOOLEAN: + // SQL types BOOL, BOOLEAN + return v; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + // SQL types DATE return formatDate((Integer) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + throw new UnsupportedOperationException( + String.format( + "Unexpected BigQuery field schema type %s for field named %s", type, name)); } - case "TIME": - if (avroType == Type.LONG) { - verify(v instanceof Long, "Expected Long, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected TimeMicros logical type"); - verify( - avroLogicalType instanceof LogicalTypes.TimeMicros, - "Expected TimeMicros logical type"); + case LONG: + if (logicalType instanceof LogicalTypes.TimeMicros) { + // SQL types TIME return formatTime((Long) v); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + // SQL types TIMESTAMP + return formatTimestamp((Long) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + // SQL types INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT) + return ((Long) v).toString(); Review Comment: why need to convert to string here? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java: ########## @@ -446,10 +456,27 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp return fromTableFieldSchema(tableSchema.getFields(), options); } + /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema(TableSchema tableSchema) { + return toGenericAvroSchema(tableSchema, false); + } + + /** Convert a list of BigQuery {@link TableSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema( + TableSchema tableSchema, Boolean stringLogicalTypes) { + return toGenericAvroSchema("root", tableSchema.getFields(), stringLogicalTypes); + } + /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ public static org.apache.avro.Schema toGenericAvroSchema( String schemaName, List<TableFieldSchema> fieldSchemas) { - return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas); + return toGenericAvroSchema(schemaName, fieldSchemas, false); + } + + /** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */ + public static org.apache.avro.Schema toGenericAvroSchema( + String schemaName, List<TableFieldSchema> fieldSchemas, Boolean stringLogicalTypes) { + return BigQueryAvroUtils.toGenericAvroSchema(schemaName, fieldSchemas, stringLogicalTypes); Review Comment: Consider sync the parameter name `stringLogicalTypes` -> `useAvroLogicalTypes` ? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -64,35 +58,95 @@ */ class BigQueryAvroUtils { + static class DateTimeLogicalType extends LogicalType { + public DateTimeLogicalType() { + super("datetime"); + } + } + + static final DateTimeLogicalType DATETIME_LOGICAL_TYPE = new DateTimeLogicalType(); + /** * Defines the valid mapping between BigQuery types and native Avro types. * - * <p>Some BigQuery types are duplicated here since slightly different Avro records are produced - * when exporting data in Avro format and when reading data directly using the read API. + * @see <a href=https://cloud.google.com/bigquery/docs/exporting-data#avro_export_details>BQ avro + * export</a> + * @see <a href=https://cloud.google.com/bigquery/docs/reference/storage#avro_schema_details>BQ + * avro storage</a> */ - static final ImmutableMultimap<String, Type> BIG_QUERY_TO_AVRO_TYPES = - ImmutableMultimap.<String, Type>builder() - .put("STRING", Type.STRING) - .put("GEOGRAPHY", Type.STRING) - .put("BYTES", Type.BYTES) - .put("INTEGER", Type.LONG) - .put("INT64", Type.LONG) - .put("FLOAT", Type.DOUBLE) - .put("FLOAT64", Type.DOUBLE) - .put("NUMERIC", Type.BYTES) - .put("BIGNUMERIC", Type.BYTES) - .put("BOOLEAN", Type.BOOLEAN) - .put("BOOL", Type.BOOLEAN) - .put("TIMESTAMP", Type.LONG) - .put("RECORD", Type.RECORD) - .put("STRUCT", Type.RECORD) - .put("DATE", Type.STRING) - .put("DATE", Type.INT) - .put("DATETIME", Type.STRING) - .put("TIME", Type.STRING) - .put("TIME", Type.LONG) - .put("JSON", Type.STRING) - .build(); + static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTypes) { + String bqType = schema.getType(); + switch (bqType) { + case "BOOL": + case "BOOLEAN": + // boolean + return SchemaBuilder.builder().booleanType(); + case "BYTES": + // bytes + return SchemaBuilder.builder().bytesType(); + case "FLOAT64": + case "FLOAT": // even if not a valid BQ type, it is used in the schema + // double + return SchemaBuilder.builder().doubleType(); + case "INT64": + case "INT": + case "SMALLINT": + case "INTEGER": + case "BIGINT": + case "TINYINT": + case "BYTEINT": + // long + return SchemaBuilder.builder().longType(); + case "STRING": + // string + return SchemaBuilder.builder().stringType(); + case "NUMERIC": + case "BIGNUMERIC": + // decimal + LogicalType logicalType; + if (schema.getScale() != null) { + logicalType = + LogicalTypes.decimal(schema.getPrecision().intValue(), schema.getScale().intValue()); + } else if (schema.getPrecision() != null) { + logicalType = LogicalTypes.decimal(schema.getPrecision().intValue()); + } else if (bqType.equals("NUMERIC")) { + logicalType = LogicalTypes.decimal(38, 9); + } else { Review Comment: add a comment `// BIGNUMERIC` ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java: ########## @@ -322,15 +322,21 @@ private static FieldType fromTableFieldSchemaType( case "BYTES": return FieldType.BYTES; case "INT64": + case "INT": + case "SMALLINT": case "INTEGER": + case "BIGINT": + case "TINYINT": + case "BYTEINT": Review Comment: There is FieldType.INT32, FieldType.INT16, FieldType.BYTE, shall we do a more precise map or is there additional risk? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -64,35 +58,95 @@ */ class BigQueryAvroUtils { + static class DateTimeLogicalType extends LogicalType { Review Comment: There are avro and Beam logical type with same class names, consider noting ``` // org.apache.avro.LogicalType ``` for readability. ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java: ########## @@ -182,30 +179,17 @@ public List<BigQueryStorageStreamSource<T>> split( LOG.info("Read session returned {} streams", readSession.getStreamsList().size()); } - Schema sessionSchema; - if (readSession.getDataFormat() == DataFormat.ARROW) { Review Comment: what you mind explaining a little bit what kind of improvement made ARROW/AVRO schemas no longer needed here? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java: ########## @@ -1275,8 +1272,12 @@ public PCollection<T> expand(PBegin input) { Schema beamSchema = null; if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) { - beamSchema = sourceDef.getBeamSchema(bqOptions); - beamSchema = getFinalSchema(beamSchema, getSelectedFields()); + TableSchema tableSchema = sourceDef.getTableSchema(bqOptions); + ValueProvider<List<String>> selectedFields = getSelectedFields(); + if (selectedFields != null) { + tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get()); Review Comment: We still need to check `selectedFields.isAccessible()` (as the original getFinalSchema) as this is done in pipeline expansion time ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java: ########## @@ -45,6 +45,15 @@ <T> BigQuerySourceBase<T> toSource( SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory, boolean useAvroLogicalTypes); + /** + * Extract the {@link TableSchema} corresponding to this source. + * + * @param bqOptions BigQueryOptions + * @return table schema of the source + * @throws BigQuerySchemaRetrievalException if schema retrieval fails + */ + TableSchema getTableSchema(BigQueryOptions bqOptions); + Review Comment: Now getBeamSchema is trivial and the impl in its implementation classes are the same. Consider make a default implementation here? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## @@ -273,145 +292,104 @@ private static List<Object> convertRepeatedField( @SuppressWarnings("unchecked") List<Object> elements = (List<Object>) v; ArrayList<Object> values = new ArrayList<>(); - Type elementType = schema.getElementType().getType(); - LogicalType elementLogicalType = schema.getElementType().getLogicalType(); for (Object element : elements) { - values.add(convertRequiredField(elementType, elementLogicalType, fieldSchema, element)); + values.add(convertRequiredField(name, elementType, element)); } return values; } - private static Object convertRequiredField( - Type avroType, LogicalType avroLogicalType, TableFieldSchema fieldSchema, Object v) { + private static Object convertRequiredField(String name, Schema schema, Object v) { // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. - checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName()); - // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field - // is required, so it may not be null. - String bqType = fieldSchema.getType(); - ImmutableCollection<Type> expectedAvroTypes = BIG_QUERY_TO_AVRO_TYPES.get(bqType); - verifyNotNull(expectedAvroTypes, "Unsupported BigQuery type: %s", bqType); - verify( - expectedAvroTypes.contains(avroType), - "Expected Avro schema types %s for BigQuery %s field %s, but received %s", - expectedAvroTypes, - bqType, - fieldSchema.getName(), - avroType); + checkNotNull(v, "REQUIRED field %s should not be null", name); + // For historical reasons, don't validate avroLogicalType except for with NUMERIC. // BigQuery represents NUMERIC in Avro format as BYTES with a DECIMAL logical type. - switch (bqType) { - case "STRING": - case "DATETIME": - case "GEOGRAPHY": - case "JSON": - // Avro will use a CharSequence to represent String objects, but it may not always use - // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8. - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); - case "DATE": - if (avroType == Type.INT) { - verify(v instanceof Integer, "Expected Integer, got %s", v.getClass()); - verifyNotNull(avroLogicalType, "Expected Date logical type"); - verify(avroLogicalType instanceof LogicalTypes.Date, "Expected Date logical type"); + Type type = schema.getType(); + LogicalType logicalType = schema.getLogicalType(); + switch (type) { + case BOOLEAN: + // SQL types BOOL, BOOLEAN + return v; + case INT: + if (logicalType instanceof LogicalTypes.Date) { + // SQL types DATE return formatDate((Integer) v); } else { - verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass()); - return v.toString(); + throw new UnsupportedOperationException( Review Comment: shall we fall back to normal integer (or at least fall back when schema.getLogicalType is null), as what LONG did in the following? ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java: ########## @@ -193,8 +193,7 @@ public void teardown() { @Test public void testBuildTableBasedSource() { BigQueryIO.TypedRead<TableRow> typedRead = - BigQueryIO.read(new TableRowParser()) Review Comment: same, test still pass on old config. We can preserve the old test code (or just update some of them) so it still covers different config settings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org