This is an automated email from the ASF dual-hosted git repository.
cvandermerwe pushed a commit to branch release-2.71
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.71 by this push:
new 597768686ed Cherry pick #37257 and #37294 into release-2.71 (#37307)
597768686ed is described below
commit 597768686ed3711e70e9038570f03a4531a761d7
Author: claudevdm <[email protected]>
AuthorDate: Wed Jan 14 13:49:24 2026 -0500
Cherry pick #37257 and #37294 into release-2.71 (#37307)
* Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default
schemafactory (#37257)
* Map TIMSETAMP(12) BQ type -> timestamp-nanos Avro type in default
schemafactory
* Use default schemafactory in test.
---------
Co-authored-by: Claude <[email protected]>
* Support picosecond tiemstamps when writing GenericRecord and Beam Rows.
(#37294)
Co-authored-by: Claude <[email protected]>
---------
Co-authored-by: Claude <[email protected]>
---
.../AvroGenericRecordToStorageApiProto.java | 3 ++
.../io/gcp/bigquery/BeamRowToStorageApiProto.java | 3 ++
.../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 12 ++++++--
.../AvroGenericRecordToStorageApiProtoTest.java | 34 ++++++++++++++++++++++
.../gcp/bigquery/BeamRowToStorageApiProtoTest.java | 17 +++++++++++
.../io/gcp/bigquery/BigQueryTimestampPicosIT.java | 23 +++++++++++----
6 files changed, 84 insertions(+), 8 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 76174ac3d04..35751e2758e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -348,6 +348,9 @@ public class AvroGenericRecordToStorageApiProto {
fieldDescriptorFromAvroField(
new Schema.Field(field.name(), elementType, field.doc(),
field.defaultVal()));
builder = builder.setType(elementFieldSchema.getType());
+ if (elementFieldSchema.hasTimestampPrecision()) {
+
builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
+ }
builder.addAllFields(elementFieldSchema.getFieldsList());
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
break;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index adb8e4468c0..d940ff8dd7f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -237,6 +237,9 @@ public class BeamRowToStorageApiProto {
TableFieldSchema elementFieldSchema =
fieldDescriptorFromBeamField(Field.of(field.getName(),
elementType));
builder = builder.setType(elementFieldSchema.getType());
+ if (elementFieldSchema.hasTimestampPrecision()) {
+ builder =
builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision());
+ }
builder.addAllFields(elementFieldSchema.getFieldsList());
builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
break;
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index e1ff0f58f14..46a014f8196 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -102,9 +102,15 @@ class BigQueryAvroUtils {
// boolean
return SchemaBuilder.builder().booleanType();
case "TIMESTAMP":
- // in Extract Jobs, it always uses the Avro logical type
- // we may have to change this if we move to EXPORT DATA
- return
LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+ if (schema.getTimestampPrecision() == null ||
schema.getTimestampPrecision() == 6) {
+ // in Extract Jobs, it always uses the Avro logical type
+ // we may have to change this if we move to EXPORT DATA
+ return
LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
+ }
+ return SchemaBuilder.builder()
+ .longBuilder()
+ .prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE)
+ .endLong();
case "DATE":
if (useAvroLogicalTypes) {
return
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index deabb1dd05f..9698aaff1d7 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -348,7 +348,23 @@ public class AvroGenericRecordToStorageApiProtoTest {
.endRecord();
}
+ private static Schema createRepeatedTimestampNanosSchema() {
+ Schema longSchema = Schema.create(Schema.Type.LONG);
+ longSchema.addProp("logicalType", "timestamp-nanos");
+
+ Schema arraySchema = Schema.createArray(longSchema);
+
+ return SchemaBuilder.record("RepeatedTimestampNanosRecord")
+ .fields()
+ .name("timestampNanosArray")
+ .type(arraySchema)
+ .noDefault()
+ .endRecord();
+ }
+
private static final Schema TIMESTAMP_NANOS_SCHEMA =
createTimestampNanosSchema();
+ private static final Schema REPEATED_TIMESTAMP_NANOS_SCHEMA =
+ createRepeatedTimestampNanosSchema();
private static GenericRecord baseRecord;
private static GenericRecord rawLogicalTypesRecord;
@@ -885,4 +901,22 @@ public class AvroGenericRecordToStorageApiProtoTest {
assertTrue(field.hasTimestampPrecision());
assertEquals(12L, field.getTimestampPrecision().getValue());
}
+
+ @Test
+ public void testProtoTableSchemaFromAvroSchemaRepeatedTimestampNanos() {
+ com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
+ AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema(
+ REPEATED_TIMESTAMP_NANOS_SCHEMA);
+
+ assertEquals(1, protoSchema.getFieldsCount());
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema field =
protoSchema.getFields(0);
+
+ assertEquals("timestampnanosarray", field.getName());
+ assertEquals(
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP,
field.getType());
+ assertEquals(
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED,
field.getMode());
+
+ assertEquals(12L, field.getTimestampPrecision().getValue());
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
index d7a88615a50..c546a7ca5d7 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
@@ -69,6 +69,10 @@ public class BeamRowToStorageApiProtoTest {
Schema.builder()
.addField("timestampNanos",
FieldType.logicalType(Timestamp.NANOS).withNullable(true))
.build();
+ private static final Schema TIMESTAMP_NANOS_ARRAY_SCHEMA =
+ Schema.builder()
+ .addField("timestampNanosArray",
FieldType.array(FieldType.logicalType(Timestamp.NANOS)))
+ .build();
private static final EnumerationType TEST_ENUM =
EnumerationType.create("ONE", "TWO", "RED", "BLUE");
private static final Schema BASE_SCHEMA =
@@ -614,6 +618,19 @@ public class BeamRowToStorageApiProtoTest {
assertEquals(12L, field.getTimestampPrecision().getValue());
}
+ @Test
+ public void testTimestampNanosArraySchema() {
+ com.google.cloud.bigquery.storage.v1.TableSchema protoSchema =
+
BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_ARRAY_SCHEMA);
+
+ assertEquals(1, protoSchema.getFieldsCount());
+ TableFieldSchema field = protoSchema.getFields(0);
+ assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType());
+ assertEquals(
+ com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED,
field.getMode());
+ assertEquals(12L, field.getTimestampPrecision().getValue());
+ }
+
@Test
public void testTimestampNanosDescriptor() throws Exception {
DescriptorProto descriptor =
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
index 6d155185ee6..07b6adf46bc 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
@@ -408,6 +408,10 @@ public class BigQueryTimestampPicosIT {
.name("ts_nanos")
.type(longSchema)
.noDefault()
+ .name("ts_picos")
+ .type()
+ .stringType()
+ .noDefault()
.endRecord();
}
@@ -421,12 +425,12 @@ public class BigQueryTimestampPicosIT {
public void testWriteGenericRecordTimestampNanos() throws Exception {
String tableSpec =
String.format("%s:%s.%s", project, DATASET_ID,
"generic_record_ts_nanos_test");
-
// Create GenericRecord with timestamp-nanos value
GenericRecord record =
new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA)
.set(
"ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L +
TEST_INSTANT.getNano())
+ .set("ts_picos", "2024-01-15T10:30:45.123456789123Z")
.build();
// Write using Storage Write API with Avro format
@@ -437,7 +441,6 @@ public class BigQueryTimestampPicosIT {
"WriteGenericRecords",
BigQueryIO.writeGenericRecords()
.to(tableSpec)
- .withAvroSchemaFactory(tableSchema ->
TIMESTAMP_NANOS_AVRO_SCHEMA)
.withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA,
true))
.useAvroLogicalTypes()
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
@@ -457,12 +460,18 @@ public class BigQueryTimestampPicosIT {
.from(tableSpec));
PAssert.that(result)
- .containsInAnyOrder(new TableRow().set("ts_nanos",
"2024-01-15T10:30:45.123456789000Z"));
+ .containsInAnyOrder(
+ new TableRow()
+ .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
+ .set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
readPipeline.run().waitUntilFinish();
}
private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA =
- Schema.builder().addField("ts_nanos",
Schema.FieldType.logicalType(Timestamp.NANOS)).build();
+ Schema.builder()
+ .addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS))
+ .addField("ts_picos", Schema.FieldType.STRING)
+ .build();
@Test
public void testWriteBeamRowTimestampNanos() throws Exception {
@@ -472,6 +481,7 @@ public class BigQueryTimestampPicosIT {
Row row =
Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)
.withFieldValue("ts_nanos", TEST_INSTANT)
+ .withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z")
.build();
// Write using Storage Write API with Beam Schema
@@ -500,7 +510,10 @@ public class BigQueryTimestampPicosIT {
.from(tableSpec));
PAssert.that(result)
- .containsInAnyOrder(new TableRow().set("ts_nanos",
"2024-01-15T10:30:45.123456789000Z"));
+ .containsInAnyOrder(
+ new TableRow()
+ .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
+ .set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
readPipeline.run().waitUntilFinish();
}