This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5806b6e4b93 Support Beam MicrosInstant conversion to Avro Timestamp
(#36605)
5806b6e4b93 is described below
commit 5806b6e4b930b6fed07a2aac8f086ffb6e530b4c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Nov 19 14:33:35 2025 -0500
Support Beam MicrosInstant conversion to Avro Timestamp (#36605)
* support MicrosInstant conversion to Avro Timestamp
* add test
* style
* skip if no expansion jars
* trigger ITs
* style
---
.../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +-
.../extensions/avro/schemas/utils/AvroUtils.java | 9 ++++++
.../avro/schemas/utils/AvroUtilsTest.java | 34 +++++++++++++++++++++
.../io/external/xlang_bigqueryio_it_test.py | 35 ++++++++++++++++++++--
4 files changed, 77 insertions(+), 3 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index 95fef3e26ca..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 13
+ "modification": 1
}
diff --git
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
index 1a8cac7ffb6..38621571ca1 100644
---
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
+++
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
@@ -135,6 +135,8 @@ import org.joda.time.ReadableInstant;
* LogicalTypes.Date <-----> LogicalType(DATE)
* <------
LogicalType(urn="beam:logical_type:date:v1")
* LogicalTypes.TimestampMillis <-----> DATETIME
+ * LogicalTypes.TimestampMicros ------> Long
+ * LogicalTypes.TimestampMicros <------
LogicalType(urn="beam:logical_type:micros_instant:v1")
* LogicalTypes.Decimal <-----> DECIMAL
* </pre>
*
@@ -1181,6 +1183,9 @@ public class AvroUtils {
baseType =
LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
} else if ("TIME".equals(identifier)) {
baseType =
LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
+ } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
+ baseType =
+
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
} else {
throw new RuntimeException(
"Unhandled logical type " +
checkNotNull(fieldType.getLogicalType()).getIdentifier());
@@ -1331,6 +1336,10 @@ public class AvroUtils {
return ((java.time.LocalDate) value).toEpochDay();
} else if ("TIME".equals(identifier)) {
return (int) ((Instant) value).getMillis();
+ } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
+ java.time.Instant instant = (java.time.Instant) value;
+ return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+ + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
} else {
throw new RuntimeException("Unhandled logical type " + identifier);
}
diff --git
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
index 7cda1e9dba5..41a43ed850b 100644
---
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
+++
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
@@ -32,6 +32,7 @@ import java.time.temporal.TemporalUnit;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
@@ -1038,6 +1039,39 @@ public class AvroUtilsTest {
assertEquals(row, deserializedRow);
}
+ @Test
+ public void testBeamTimestampLogicalTypeToAvro() {
+ // Tests special handling for Beam's MicrosInstant logical type
+ // Only one way (Beam to Avro)
+
+ Schema beamSchema =
+ Schema.builder().addLogicalTypeField("timestampMicrosLT",
SqlTypes.TIMESTAMP).build();
+ List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
+ fields.add(
+ new org.apache.avro.Schema.Field(
+ "timestampMicrosLT",
+
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
+ "",
+ (Object) null));
+ org.apache.avro.Schema avroSchema =
+ org.apache.avro.Schema.createRecord("topLevelRecord", null, null,
false, fields);
+
+ assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema));
+
+ java.time.Instant instant =
+
java.time.Instant.ofEpochMilli(DATE_TIME.getMillis()).plusNanos(123000);
+ Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();
+ GenericRecord avroRecord =
+ new GenericRecordBuilder(avroSchema)
+ .set(
+ "timestampMicrosLT",
+ TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+ + TimeUnit.NANOSECONDS.toMicros(instant.getNano()))
+ .build();
+
+ assertEquals(avroRecord, AvroUtils.toGenericRecord(beamRow));
+ }
+
@Test
public void testNullSchemas() {
assertEquals(
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index 51ae97b9917..d659d57aad9 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -114,7 +114,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
self.project = self.test_pipeline.get_option('project')
self._runner = PipelineOptions(self.args).get_all_options()['runner']
- self.bigquery_client = BigQueryWrapper()
+ self.bigquery_client = BigQueryWrapper.from_pipeline_options(
+ self.test_pipeline.options)
self.dataset_id = '%s_%s_%s' % (
self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3))
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
@@ -154,7 +155,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
self, table_prefix, storage_uri, expected_count=1):
"""Verify that Iceberg table directories are created in
the warehouse location.
-
+
Args:
table_prefix: The table name prefix to look for
storage_uri: The GCS storage URI (e.g., 'gs://bucket/path')
@@ -607,6 +608,36 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
# Verify that the table directory was created in the warehouse location
self.assert_iceberg_tables_created(table, big_lake_config['storageUri'])
+ def test_write_with_managed_transform(self):
+ table = 'write_with_managed_transform'
+ table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
+
+ row_elements = [
+ beam.Row(
+ my_int=e['int'],
+ my_float=e['float'],
+ my_string=e['str'],
+ my_bool=e['bool'],
+ my_bytes=e['bytes'],
+ my_timestamp=e['timestamp']) for e in self.ELEMENTS
+ ]
+
+ expected = []
+ for e in self.ELEMENTS:
+ del e["numeric"]
+ expected.append(e)
+ bq_matcher = BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM {}.{}".format(self.dataset_id, table),
+ data=self.parse_expected_data(expected))
+
+ with beam.Pipeline(argv=self.args) as p:
+ _ = (
+ p
+ | beam.Create(row_elements)
+ | beam.managed.Write("bigquery", config={"table": table_id}))
+ hamcrest_assert(p, bq_matcher)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)