This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5806b6e4b93 Support Beam MicrosInstant conversion to Avro Timestamp 
(#36605)
5806b6e4b93 is described below

commit 5806b6e4b930b6fed07a2aac8f086ffb6e530b4c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Nov 19 14:33:35 2025 -0500

    Support Beam MicrosInstant conversion to Avro Timestamp (#36605)
    
    * support MicrosInstant conversion to Avro Timestamp
    
    * add test
    
    * style
    
    * skip if no expansion jars
    
    * trigger ITs
    
    * style
---
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |  2 +-
 .../extensions/avro/schemas/utils/AvroUtils.java   |  9 ++++++
 .../avro/schemas/utils/AvroUtilsTest.java          | 34 +++++++++++++++++++++
 .../io/external/xlang_bigqueryio_it_test.py        | 35 ++++++++++++++++++++--
 4 files changed, 77 insertions(+), 3 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index 95fef3e26ca..e3d6056a5de 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 13
+  "modification": 1
 }
diff --git 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
index 1a8cac7ffb6..38621571ca1 100644
--- 
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
@@ -135,6 +135,8 @@ import org.joda.time.ReadableInstant;
  *   LogicalTypes.Date              <-----> LogicalType(DATE)
  *                                  <------ 
LogicalType(urn="beam:logical_type:date:v1")
  *   LogicalTypes.TimestampMillis   <-----> DATETIME
+ *   LogicalTypes.TimestampMicros   ------> Long
+ *   LogicalTypes.TimestampMicros   <------ 
LogicalType(urn="beam:logical_type:micros_instant:v1")
  *   LogicalTypes.Decimal           <-----> DECIMAL
  * </pre>
  *
@@ -1181,6 +1183,9 @@ public class AvroUtils {
           baseType = 
LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
         } else if ("TIME".equals(identifier)) {
           baseType = 
LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
+        } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
+          baseType =
+              
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
         } else {
           throw new RuntimeException(
               "Unhandled logical type " + 
checkNotNull(fieldType.getLogicalType()).getIdentifier());
@@ -1331,6 +1336,10 @@ public class AvroUtils {
           return ((java.time.LocalDate) value).toEpochDay();
         } else if ("TIME".equals(identifier)) {
           return (int) ((Instant) value).getMillis();
+        } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
+          java.time.Instant instant = (java.time.Instant) value;
+          return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+              + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
         } else {
           throw new RuntimeException("Unhandled logical type " + identifier);
         }
diff --git 
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
 
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
index 7cda1e9dba5..41a43ed850b 100644
--- 
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
+++ 
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
@@ -32,6 +32,7 @@ import java.time.temporal.TemporalUnit;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
@@ -1038,6 +1039,39 @@ public class AvroUtilsTest {
     assertEquals(row, deserializedRow);
   }
 
+  @Test
+  public void testBeamTimestampLogicalTypeToAvro() {
+    // Tests special handling for Beam's MicrosInstant logical type
+    // Only one way (Beam to Avro)
+
+    Schema beamSchema =
+        Schema.builder().addLogicalTypeField("timestampMicrosLT", 
SqlTypes.TIMESTAMP).build();
+    List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
+    fields.add(
+        new org.apache.avro.Schema.Field(
+            "timestampMicrosLT",
+            
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)),
+            "",
+            (Object) null));
+    org.apache.avro.Schema avroSchema =
+        org.apache.avro.Schema.createRecord("topLevelRecord", null, null, 
false, fields);
+
+    assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema));
+
+    java.time.Instant instant =
+        
java.time.Instant.ofEpochMilli(DATE_TIME.getMillis()).plusNanos(123000);
+    Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();
+    GenericRecord avroRecord =
+        new GenericRecordBuilder(avroSchema)
+            .set(
+                "timestampMicrosLT",
+                TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+                    + TimeUnit.NANOSECONDS.toMicros(instant.getNano()))
+            .build();
+
+    assertEquals(avroRecord, AvroUtils.toGenericRecord(beamRow));
+  }
+
   @Test
   public void testNullSchemas() {
     assertEquals(
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
index 51ae97b9917..d659d57aad9 100644
--- a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -114,7 +114,8 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
     self.project = self.test_pipeline.get_option('project')
     self._runner = PipelineOptions(self.args).get_all_options()['runner']
 
-    self.bigquery_client = BigQueryWrapper()
+    self.bigquery_client = BigQueryWrapper.from_pipeline_options(
+        self.test_pipeline.options)
     self.dataset_id = '%s_%s_%s' % (
         self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3))
     self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
@@ -154,7 +155,7 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
       self, table_prefix, storage_uri, expected_count=1):
     """Verify that Iceberg table directories are created in
     the warehouse location.
-    
+
     Args:
       table_prefix: The table name prefix to look for
       storage_uri: The GCS storage URI (e.g., 'gs://bucket/path')
@@ -607,6 +608,36 @@ class BigQueryXlangStorageWriteIT(unittest.TestCase):
     # Verify that the table directory was created in the warehouse location
     self.assert_iceberg_tables_created(table, big_lake_config['storageUri'])
 
+  def test_write_with_managed_transform(self):
+    table = 'write_with_managed_transform'
+    table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table)
+
+    row_elements = [
+        beam.Row(
+            my_int=e['int'],
+            my_float=e['float'],
+            my_string=e['str'],
+            my_bool=e['bool'],
+            my_bytes=e['bytes'],
+            my_timestamp=e['timestamp']) for e in self.ELEMENTS
+    ]
+
+    expected = []
+    for e in self.ELEMENTS:
+      del e["numeric"]
+      expected.append(e)
+    bq_matcher = BigqueryFullResultMatcher(
+        project=self.project,
+        query="SELECT * FROM {}.{}".format(self.dataset_id, table),
+        data=self.parse_expected_data(expected))
+
+    with beam.Pipeline(argv=self.args) as p:
+      _ = (
+          p
+          | beam.Create(row_elements)
+          | beam.managed.Write("bigquery", config={"table": table_id}))
+    hamcrest_assert(p, bq_matcher)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to