This is an automated email from the ASF dual-hosted git repository.

cvandermerwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 44ed498ee5c Add Picosecond timestamp support to storage API 
WriteTableRows (#37091)
44ed498ee5c is described below

commit 44ed498ee5cb3336cc3915732b0fa6b2068c1f58
Author: claudevdm <[email protected]>
AuthorDate: Mon Jan 5 09:41:31 2026 -0500

    Add Picosecond timestamp support to storage API WriteTableRows (#37091)
    
    * initial
    
    * typo
    
    * test refactor.
    
    * Fix tests.
    
    * spotless
    
    * comments.
    
    * handle joda instant.
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 runners/google-cloud-dataflow-java/build.gradle    |   2 +
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    |  63 ++++
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 106 +++++-
 .../io/gcp/bigquery/BigQueryTimestampPicosIT.java  | 414 +++++++++++++++++++++
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java     |  94 +++++
 .../bigquery/TableRowToStorageApiProtoTest.java    | 142 ++++++-
 6 files changed, 812 insertions(+), 9 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 50498d24c62..3792626a1fd 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -643,6 +643,7 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: 
Test, dependsOn: copyG
   exclude '**/BigQueryIODynamicQueryIT.class'
   exclude '**/BigQueryIODynamicReadIT.class'
   exclude '**/BigQueryIODynamicReadTableRowIT.class'
+  exclude '**/BigQueryTimestampPicosIT.java'
   exclude '**/PubsubReadIT.class'
   exclude '**/FhirIOReadIT.class'
   exclude '**/DicomIOReadIT.class'
@@ -698,6 +699,7 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) 
{
   exclude '**/BigQueryIODynamicQueryIT.class'
   exclude '**/BigQueryIODynamicReadIT.class'
   exclude '**/BigQueryIODynamicReadTableRowIT.class'
+  exclude '**/BigQueryTimestampPicosIT.java'
   exclude '**/SpannerWriteIT.class'
   exclude '**/*KmsKeyIT.class'
   exclude '**/FhirIOReadIT.class'
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index f9a59ba089c..94898787127 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -115,6 +115,8 @@ public class BigQueryUtils {
               + "(?<DATASET>[a-zA-Z0-9_]{1,1024})[\\.]"
               + 
"(?<TABLE>[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}$]{1,1024})$");
 
+  private static final long PICOSECOND_PRECISION = 12L;
+
   /** Options for how to convert BigQuery data to Beam data. */
   @AutoValue
   public abstract static class ConversionOptions implements Serializable {
@@ -380,6 +382,67 @@ public class BigQueryUtils {
     return ret;
   }
 
+  /**
+   * Represents a timestamp with picosecond precision, split into seconds and 
picoseconds
+   * components.
+   */
+  public static class TimestampPicos {
+    final long seconds;
+    final long picoseconds;
+
+    TimestampPicos(long seconds, long picoseconds) {
+      this.seconds = seconds;
+      this.picoseconds = picoseconds;
+    }
+
+    /**
+     * Parses a timestamp string into seconds and picoseconds components.
+     *
+     * <p>Handles two formats:
+     *
+     * <ul>
+     *   <li>ISO format with exactly 12 fractional digits ending in Z 
(picosecond precision): e.g.,
+     *       "2024-01-15T10:30:45.123456789012Z"
+     *   <li>UTC format with 0-9 fractional digits ending in "UTC" (up to 
nanosecond precision):
+     *       e.g., "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45 
UTC"
+     * </ul>
+     */
+    public static TimestampPicos fromString(String timestampString) {
+      // Check for ISO picosecond format up to 12 fractional digits before Z
+      // Format: "2024-01-15T10:30:45.123456789012Z"
+      if (timestampString.endsWith("Z")) {
+        int dotIndex = timestampString.lastIndexOf('.');
+
+        if (dotIndex > 0) {
+          String fractionalPart =
+              timestampString.substring(dotIndex + 1, timestampString.length() 
- 1);
+
+          if ((long) fractionalPart.length() == PICOSECOND_PRECISION) {
+            // ISO timestamp with 12 decimal digits (picosecond precision)
+            // Parse the datetime part (without fractional seconds)
+            String dateTimePart = timestampString.substring(0, dotIndex) + "Z";
+            java.time.Instant baseInstant = 
java.time.Instant.parse(dateTimePart);
+
+            // Parse all 12 digits directly as picoseconds (subsecond portion)
+            long picoseconds = Long.parseLong(fractionalPart);
+
+            return new TimestampPicos(baseInstant.getEpochSecond(), 
picoseconds);
+          }
+        }
+
+        // ISO format with 0-9 fractional digits - Instant.parse handles this
+        java.time.Instant timestamp = java.time.Instant.parse(timestampString);
+        return new TimestampPicos(timestamp.getEpochSecond(), 
timestamp.getNano() * 1000L);
+      }
+
+      // UTC format: "2024-01-15 10:30:45.123456789 UTC"
+      // Use TIMESTAMP_FORMATTER which handles space separator and "UTC" suffix
+      java.time.Instant timestamp =
+          java.time.Instant.from(TIMESTAMP_FORMATTER.parse(timestampString));
+      return new TimestampPicos(timestamp.getEpochSecond(), 
timestamp.getNano() * 1000L);
+    }
+  }
+
   /**
    * Get the Beam {@link FieldType} from a BigQuery type name.
    *
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index c5451b04a4b..ab5ae80065a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -69,6 +69,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.TimestampPicos;
 import org.apache.beam.sdk.util.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
@@ -191,6 +192,23 @@ public class TableRowToStorageApiProto {
           .put(TableFieldSchema.Type.JSON, "JSON")
           .build();
 
+  static final DescriptorProto TIMESTAMP_PICOS_DESCRIPTOR_PROTO =
+      DescriptorProto.newBuilder()
+          .setName("TimestampPicos")
+          .addField(
+              DescriptorProtos.FieldDescriptorProto.newBuilder()
+                  .setName("seconds")
+                  .setNumber(1)
+                  
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
+                  .build())
+          .addField(
+              DescriptorProtos.FieldDescriptorProto.newBuilder()
+                  .setName("picoseconds")
+                  .setNumber(2)
+                  
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
+                  .build())
+          .build();
+
   @FunctionalInterface
   public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> {
     OutputT apply(FirstInputT t, SecondInputT u) throws 
SchemaConversionException;
@@ -199,6 +217,8 @@ public class TableRowToStorageApiProto {
   static final DecimalFormat DECIMAL_FORMAT =
       new DecimalFormat("0.0###############", 
DecimalFormatSymbols.getInstance(Locale.ROOT));
 
+  private static final long PICOSECOND_PRECISION = 12L;
+
   // Map of functions to convert json values into the value expected in the 
Vortex proto object.
   static final Map<TableFieldSchema.Type, ThrowingBiFunction<String, Object, 
@Nullable Object>>
       TYPE_MAP_PROTO_CONVERTERS =
@@ -533,6 +553,9 @@ public class TableRowToStorageApiProto {
     if (field.getScale() != null) {
       builder.setScale(field.getScale());
     }
+    if (field.getTimestampPrecision() != null) {
+      
builder.getTimestampPrecisionBuilder().setValue(field.getTimestampPrecision());
+    }
     builder.setType(typeToProtoType(field.getType()));
     if (builder.getType().equals(TableFieldSchema.Type.STRUCT)) {
       for (com.google.api.services.bigquery.model.TableFieldSchema subField : 
field.getFields()) {
@@ -587,6 +610,10 @@ public class TableRowToStorageApiProto {
       return tableFieldSchema.getMode().equals(TableFieldSchema.Mode.REPEATED);
     }
 
+    public long getTimestampPrecision() {
+      return tableFieldSchema.getTimestampPrecision().getValue();
+    }
+
     public SchemaInformation getSchemaForField(String name) {
       SchemaInformation schemaInformation = 
subFieldsByName.get(name.toLowerCase());
       if (schemaInformation == null) {
@@ -631,7 +658,6 @@ public class TableRowToStorageApiProto {
           .put(TableFieldSchema.Type.DATE, Type.TYPE_INT32)
           .put(TableFieldSchema.Type.TIME, Type.TYPE_INT64)
           .put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64)
-          .put(TableFieldSchema.Type.TIMESTAMP, Type.TYPE_INT64)
           .put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
           .build();
 
@@ -957,10 +983,16 @@ public class TableRowToStorageApiProto {
 
     switch (fieldDescriptor.getType()) {
       case MESSAGE:
-        tableFieldSchemaBuilder = 
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
-        TableSchema nestedTableField = 
tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
-        tableFieldSchemaBuilder =
-            
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
+        if 
(fieldDescriptor.getMessageType().getName().equals("TimestampPicos")) {
+          tableFieldSchemaBuilder.setType(TableFieldSchema.Type.TIMESTAMP);
+          tableFieldSchemaBuilder.setPrecision(PICOSECOND_PRECISION);
+        } else {
+          tableFieldSchemaBuilder = 
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
+          TableSchema nestedTableField =
+              tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
+          tableFieldSchemaBuilder =
+              
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
+        }
         break;
       default:
         TableFieldSchema.Type type = 
PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType());
@@ -1060,6 +1092,25 @@ public class TableRowToStorageApiProto {
         fieldDescriptorBuilder =
             
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
         break;
+      case TIMESTAMP:
+        if (fieldSchema.getTimestampPrecision().getValue() == 
PICOSECOND_PRECISION) {
+          boolean typeAlreadyExists =
+              descriptorBuilder.getNestedTypeList().stream()
+                  .anyMatch(d -> 
TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName()));
+
+          if (!typeAlreadyExists) {
+            descriptorBuilder.addNestedType(TIMESTAMP_PICOS_DESCRIPTOR_PROTO);
+          }
+          fieldDescriptorBuilder =
+              fieldDescriptorBuilder
+                  .setType(Type.TYPE_MESSAGE)
+                  .setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName());
+        } else {
+          // Microsecond precision - use simple INT64
+          fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        }
+        break;
+
       default:
         @Nullable Type type = 
PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType());
         if (type == null) {
@@ -1313,6 +1364,36 @@ public class TableRowToStorageApiProto {
                 null,
                 null);
       }
+    } else if (schemaInformation.getType() == TableFieldSchema.Type.TIMESTAMP
+        && schemaInformation.getTimestampPrecision() == PICOSECOND_PRECISION) {
+
+      long seconds;
+      long picoseconds;
+
+      if (value instanceof String) {
+        TimestampPicos parsed = TimestampPicos.fromString((String) value);
+        seconds = parsed.seconds;
+        picoseconds = parsed.picoseconds;
+
+      } else if (value instanceof Instant || value instanceof 
org.joda.time.Instant) {
+        Instant timestamp =
+            value instanceof Instant
+                ? (Instant) value
+                : Instant.ofEpochMilli(((org.joda.time.Instant) 
value).getMillis());
+        seconds = timestamp.getEpochSecond();
+        picoseconds = timestamp.getNano() * 1000L;
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported timestamp value type: " + value.getClass().getName());
+      }
+
+      converted =
+          DynamicMessage.newBuilder(fieldDescriptor.getMessageType())
+              
.setField(fieldDescriptor.getMessageType().findFieldByName("seconds"), seconds)
+              .setField(
+                  
fieldDescriptor.getMessageType().findFieldByName("picoseconds"), picoseconds)
+              .build();
+
     } else {
       @Nullable
       ThrowingBiFunction<String, Object, @Nullable Object> converter =
@@ -1633,6 +1714,7 @@ public class TableRowToStorageApiProto {
           return LocalDateTime.ofInstant(instant, 
ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
         } else if 
(fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
           Message message = (Message) fieldValue;
+          String messageName = fieldDescriptor.getMessageType().getName();
           if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
               fieldDescriptor.getMessageType().getName())) {
             Descriptor descriptor = message.getDescriptorForType();
@@ -1640,6 +1722,20 @@ public class TableRowToStorageApiProto {
             int nanos = (int) 
message.getField(descriptor.findFieldByName("nanos"));
             Instant instant = Instant.ofEpochSecond(seconds, nanos);
             return LocalDateTime.ofInstant(instant, 
ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
+          } else if (messageName.equals("TimestampPicos")) {
+            Descriptor descriptor = message.getDescriptorForType();
+            long seconds = (long) 
message.getField(descriptor.findFieldByName("seconds"));
+            long picoseconds = (long) 
message.getField(descriptor.findFieldByName("picoseconds"));
+
+            // Convert to ISO timestamp string with picoseconds
+            Instant instant = Instant.ofEpochSecond(seconds);
+            String baseTimestamp = instant.toString(); // 
"2024-01-15T10:30:45Z"
+
+            // Format picoseconds as 12-digit string
+            String picosPart = String.format("%012d", picoseconds);
+
+            // Insert before 'Z': "2024-01-15T10:30:45Z" → 
"2024-01-15T10:30:45.123456789012Z"
+            return baseTimestamp.replace("Z", "." + picosPart + "Z");
           } else {
             throw new RuntimeException(
                 "Not implemented yet " + 
fieldDescriptor.getMessageType().getFullName());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
new file mode 100644
index 00000000000..5deffd4028f
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import java.security.SecureRandom;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for BigQuery TIMESTAMP with picosecond precision.
+ *
+ * <p>Tests write data via Storage Write API and read back using different 
precision settings. Each
+ * test clearly shows: WRITE DATA → READ SETTINGS → EXPECTED OUTPUT.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryTimestampPicosIT {
+
+  private static final long PICOS_PRECISION = 12L;
+
+  private static String project;
+  private static final String DATASET_ID =
+      "bq_ts_picos_" + System.currentTimeMillis() + "_" + new 
SecureRandom().nextInt(32);
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("BigQueryTimestampPicosIT");
+  private static TestBigQueryOptions bqOptions;
+  private static String nestedTableSpec;
+  private static String simpleTableSpec;
+
+  private static final TableSchema NESTED_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.of(
+                  // Simple timestamp column
+                  new TableFieldSchema()
+                      .setName("ts_simple")
+                      .setType("TIMESTAMP")
+                      .setTimestampPrecision(PICOS_PRECISION),
+                  // Array of timestamps
+                  new TableFieldSchema()
+                      .setName("ts_array")
+                      .setType("TIMESTAMP")
+                      .setTimestampPrecision(PICOS_PRECISION)
+                      .setMode("REPEATED"),
+                  // Nested struct with timestamp
+                  new TableFieldSchema()
+                      .setName("event")
+                      .setType("STRUCT")
+                      .setFields(
+                          ImmutableList.of(
+                              new 
TableFieldSchema().setName("name").setType("STRING"),
+                              new TableFieldSchema()
+                                  .setName("ts")
+                                  .setType("TIMESTAMP")
+                                  .setTimestampPrecision(PICOS_PRECISION))),
+                  // Repeated struct with timestamp
+                  new TableFieldSchema()
+                      .setName("events")
+                      .setType("STRUCT")
+                      .setMode("REPEATED")
+                      .setFields(
+                          ImmutableList.of(
+                              new 
TableFieldSchema().setName("name").setType("STRING"),
+                              new TableFieldSchema()
+                                  .setName("ts")
+                                  .setType("TIMESTAMP")
+                                  .setTimestampPrecision(PICOS_PRECISION))),
+                  // Map-like: repeated struct with timestamp key and value
+                  new TableFieldSchema()
+                      .setName("ts_map")
+                      .setType("STRUCT")
+                      .setMode("REPEATED")
+                      .setFields(
+                          ImmutableList.of(
+                              new TableFieldSchema()
+                                  .setName("key")
+                                  .setType("TIMESTAMP")
+                                  .setTimestampPrecision(PICOS_PRECISION),
+                              new TableFieldSchema()
+                                  .setName("value")
+                                  .setType("TIMESTAMP")
+                                  .setTimestampPrecision(PICOS_PRECISION)))));
+
+  private static final TableSchema SIMPLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.of(
+                  // Simple timestamp column
+                  new TableFieldSchema()
+                      .setName("ts_simple")
+                      .setType("TIMESTAMP")
+                      .setTimestampPrecision(PICOS_PRECISION)));
+
+  // 
============================================================================
+  // TEST DATA - Written once, read with different precision settings
+  // 
============================================================================
+  private static final List<TableRow> NESTED_WRITE_DATA =
+      ImmutableList.of(
+          new TableRow()
+              .set("ts_simple", "2024-01-15T10:30:45.123456789012Z")
+              .set(
+                  "ts_array",
+                  ImmutableList.of(
+                      "2024-01-15T10:30:45.111111111111Z", 
"2024-06-20T15:45:30.222222222222Z"))
+              .set(
+                  "event",
+                  new TableRow()
+                      .set("name", "login")
+                      .set("ts", "2024-01-15T10:30:45.333333333333Z"))
+              .set(
+                  "events",
+                  ImmutableList.of(
+                      new TableRow()
+                          .set("name", "click")
+                          .set("ts", "2024-01-15T10:30:45.444444444444Z"),
+                      new TableRow()
+                          .set("name", "scroll")
+                          .set("ts", "2024-01-15T10:30:45.555555555555Z")))
+              .set(
+                  "ts_map",
+                  ImmutableList.of(
+                      new TableRow()
+                          .set("key", "2024-01-15T10:30:45.666666666666Z")
+                          .set("value", "2024-01-15T10:30:45.777777777777Z"))),
+          new TableRow()
+              .set("ts_simple", "1890-01-01T00:00:00.123456789123Z")
+              .set("ts_array", 
ImmutableList.of("1970-01-01T00:00:00.000000000002Z"))
+              .set(
+                  "event",
+                  new TableRow()
+                      .set("name", "epoch")
+                      .set("ts", "1970-01-01T00:00:00.000000000003Z"))
+              .set(
+                  "events",
+                  ImmutableList.of(
+                      new TableRow()
+                          .set("name", "start")
+                          .set("ts", "1970-01-01T00:00:00.000000000004Z")))
+              .set(
+                  "ts_map",
+                  ImmutableList.of(
+                      new TableRow()
+                          .set("key", "1970-01-01T00:00:00.000000000005Z")
+                          .set("value", 
"1970-01-01T00:00:00.000000000006Z"))));
+
+  private static final List<TableRow> SIMPLE_WRITE_DATA =
+      ImmutableList.of(
+          new TableRow().set("ts_simple", "2024-01-15T10:30:45.123456789012Z"),
+          new TableRow().set("ts_simple", 
"1890-01-01T00:00:00.123456789123Z"));
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    bqOptions = 
TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class);
+    project = bqOptions.as(GcpOptions.class).getProject();
+    BQ_CLIENT.createNewDataset(project, DATASET_ID, null, "us-central1");
+    nestedTableSpec = String.format("%s:%s.%s", project, DATASET_ID, 
"nested_timestamp_picos_test");
+    simpleTableSpec = String.format("%s:%s.%s", project, DATASET_ID, 
"simple_timestamp_picos_test");
+
+    // Write test data
+    Pipeline writePipeline = Pipeline.create(bqOptions);
+    writePipeline
+        .apply("CreateNestedData", Create.of(NESTED_WRITE_DATA))
+        .apply(
+            "WriteNestedData",
+            BigQueryIO.writeTableRows()
+                .to(nestedTableSpec)
+                .withSchema(NESTED_SCHEMA)
+                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+    writePipeline
+        .apply("CreateSimpleData", Create.of(SIMPLE_WRITE_DATA))
+        .apply(
+            "WriteSimpleData",
+            BigQueryIO.writeTableRows()
+                .to(simpleTableSpec)
+                .withSchema(SIMPLE_SCHEMA)
+                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+    writePipeline.run().waitUntilFinish();
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    BQ_CLIENT.deleteDataset(project, DATASET_ID);
+  }
+
+  @Test
+  public void testReadWithPicosPrecision_Avro() {
+
+    List<TableRow> expectedOutput =
+        ImmutableList.of(
+            new TableRow()
+                .set("ts_simple", "2024-01-15T10:30:45.123456789012Z")
+                .set(
+                    "ts_array",
+                    ImmutableList.of(
+                        "2024-01-15T10:30:45.111111111111Z", 
"2024-06-20T15:45:30.222222222222Z"))
+                .set(
+                    "event",
+                    new TableRow()
+                        .set("name", "login")
+                        .set("ts", "2024-01-15T10:30:45.333333333333Z"))
+                .set(
+                    "events",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("name", "click")
+                            .set("ts", "2024-01-15T10:30:45.444444444444Z"),
+                        new TableRow()
+                            .set("name", "scroll")
+                            .set("ts", "2024-01-15T10:30:45.555555555555Z")))
+                .set(
+                    "ts_map",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("key", "2024-01-15T10:30:45.666666666666Z")
+                            .set("value", 
"2024-01-15T10:30:45.777777777777Z"))),
+            new TableRow()
+                .set("ts_simple", "1890-01-01T00:00:00.123456789123Z")
+                .set("ts_array", 
ImmutableList.of("1970-01-01T00:00:00.000000000002Z"))
+                .set(
+                    "event",
+                    new TableRow()
+                        .set("name", "epoch")
+                        .set("ts", "1970-01-01T00:00:00.000000000003Z"))
+                .set(
+                    "events",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("name", "start")
+                            .set("ts", "1970-01-01T00:00:00.000000000004Z")))
+                .set(
+                    "ts_map",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("key", "1970-01-01T00:00:00.000000000005Z")
+                            .set("value", 
"1970-01-01T00:00:00.000000000006Z"))));
+
+    runReadTest(TimestampPrecision.PICOS, DataFormat.AVRO, expectedOutput, 
nestedTableSpec);
+  }
+
+  @Test
+  public void testReadWithNanosPrecision_Avro() {
+
+    List<TableRow> expectedOutput =
+        ImmutableList.of(
+            new TableRow()
+                .set("ts_simple", "2024-01-15 10:30:45.123456789 UTC")
+                .set(
+                    "ts_array",
+                    ImmutableList.of(
+                        "2024-01-15 10:30:45.111111111 UTC", "2024-06-20 
15:45:30.222222222 UTC"))
+                .set(
+                    "event",
+                    new TableRow()
+                        .set("name", "login")
+                        .set("ts", "2024-01-15 10:30:45.333333333 UTC"))
+                .set(
+                    "events",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("name", "click")
+                            .set("ts", "2024-01-15 10:30:45.444444444 UTC"),
+                        new TableRow()
+                            .set("name", "scroll")
+                            .set("ts", "2024-01-15 10:30:45.555555555 UTC")))
+                .set(
+                    "ts_map",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("key", "2024-01-15 10:30:45.666666666 UTC")
+                            .set("value", "2024-01-15 10:30:45.777777777 
UTC"))),
+            new TableRow()
+                .set("ts_simple", "1890-01-01 00:00:00.123456789 UTC")
+                .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC"))
+                .set(
+                    "event",
+                    new TableRow().set("name", "epoch").set("ts", "1970-01-01 
00:00:00 UTC"))
+                .set(
+                    "events",
+                    ImmutableList.of(
+                        new TableRow().set("name", "start").set("ts", 
"1970-01-01 00:00:00 UTC")))
+                .set(
+                    "ts_map",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("key", "1970-01-01 00:00:00 UTC")
+                            .set("value", "1970-01-01 00:00:00 UTC"))));
+
+    runReadTest(TimestampPrecision.NANOS, DataFormat.AVRO, expectedOutput, 
nestedTableSpec);
+  }
+
+  @Test
+  public void testReadWithMicrosPrecision_Avro() {
+
+    List<TableRow> expectedOutput =
+        ImmutableList.of(
+            new TableRow()
+                .set("ts_simple", "2024-01-15 10:30:45.123456 UTC")
+                .set(
+                    "ts_array",
+                    ImmutableList.of(
+                        "2024-01-15 10:30:45.111111 UTC", "2024-06-20 
15:45:30.222222 UTC"))
+                .set(
+                    "event",
+                    new TableRow().set("name", "login").set("ts", "2024-01-15 
10:30:45.333333 UTC"))
+                .set(
+                    "events",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("name", "click")
+                            .set("ts", "2024-01-15 10:30:45.444444 UTC"),
+                        new TableRow()
+                            .set("name", "scroll")
+                            .set("ts", "2024-01-15 10:30:45.555555 UTC")))
+                .set(
+                    "ts_map",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("key", "2024-01-15 10:30:45.666666 UTC")
+                            .set("value", "2024-01-15 10:30:45.777777 UTC"))),
+            new TableRow()
+                .set("ts_simple", "1890-01-01 00:00:00.123456 UTC")
+                .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC"))
+                .set(
+                    "event",
+                    new TableRow().set("name", "epoch").set("ts", "1970-01-01 
00:00:00 UTC"))
+                .set(
+                    "events",
+                    ImmutableList.of(
+                        new TableRow().set("name", "start").set("ts", 
"1970-01-01 00:00:00 UTC")))
+                .set(
+                    "ts_map",
+                    ImmutableList.of(
+                        new TableRow()
+                            .set("key", "1970-01-01 00:00:00 UTC")
+                            .set("value", "1970-01-01 00:00:00 UTC"))));
+
+    runReadTest(TimestampPrecision.MICROS, DataFormat.AVRO, expectedOutput, 
nestedTableSpec);
+  }
+
+  @Test
+  public void testReadWithPicosPrecision_Arrow() {
+
+    List<TableRow> expectedOutput =
+        ImmutableList.of(
+            new TableRow().set("ts_simple", 
"2024-01-15T10:30:45.123456789012Z"),
+            new TableRow().set("ts_simple", 
"1890-01-01T00:00:00.123456789123Z"));
+
+    runReadTest(TimestampPrecision.PICOS, DataFormat.ARROW, expectedOutput, 
simpleTableSpec);
+  }
+
+  @Test
+  public void testReadWithNanosPrecision_Arrow() {
+
+    List<TableRow> expectedOutput =
+        ImmutableList.of(
+            new TableRow().set("ts_simple", "2024-01-15 10:30:45.123456789 
UTC"),
+            new TableRow().set("ts_simple", "1890-01-01 00:00:00.123456789 
UTC"));
+
+    runReadTest(TimestampPrecision.NANOS, DataFormat.ARROW, expectedOutput, 
simpleTableSpec);
+  }
+
+  private void runReadTest(
+      TimestampPrecision precision,
+      DataFormat format,
+      List<TableRow> expectedOutput,
+      String tableSpec) {
+    Pipeline readPipeline = Pipeline.create(bqOptions);
+
+    PCollection<TableRow> result =
+        readPipeline.apply(
+            String.format("Read_%s_%s", precision, format),
+            BigQueryIO.readTableRows()
+                .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
+                .withFormat(format)
+                .withDirectReadPicosTimestampPrecision(precision)
+                .from(tableSpec));
+
+    PAssert.that(result).containsInAnyOrder(expectedOutput);
+    readPipeline.run().waitUntilFinish();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 675885b4e94..76a492bebd2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -1464,4 +1464,98 @@ public class BigQueryUtilsTest {
             schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456789 
UTC"));
     assertEquals(123456789, ((java.time.Instant) 
row9.getValue("ts")).getNano());
   }
+
+  /** Computes expected epoch seconds from an ISO-8601 timestamp. */
+  private static long expectedSeconds(String isoTimestamp) {
+    return java.time.Instant.parse(isoTimestamp).getEpochSecond();
+  }
+
+  @Test
+  public void testParseTimestampPicosFromString() {
+    // Format: {input, isoEquivalentForSeconds, expectedPicoseconds, 
description}
+    Object[][] testCases = {
+      // UTC format tests (space separator, "UTC" suffix)
+      {"2024-01-15 10:30:45 UTC", "2024-01-15T10:30:45Z", 0L, "UTC no 
fractional"},
+      {"2024-01-15 10:30:45.123 UTC", "2024-01-15T10:30:45Z", 
123_000_000_000L, "UTC 3 digits"},
+      {"2024-01-15 10:30:45.123456 UTC", "2024-01-15T10:30:45Z", 
123_456_000_000L, "UTC 6 digits"},
+      {
+        "2024-01-15 10:30:45.123456789 UTC",
+        "2024-01-15T10:30:45Z",
+        123_456_789_000L,
+        "UTC 9 digits"
+      },
+
+      // ISO format tests (T separator, "Z" suffix)
+      {"2024-01-15T10:30:45Z", "2024-01-15T10:30:45Z", 0L, "ISO no 
fractional"},
+      {"2024-01-15T10:30:45.123Z", "2024-01-15T10:30:45Z", 123_000_000_000L, 
"ISO 3 digits"},
+      {"2024-01-15T10:30:45.123456Z", "2024-01-15T10:30:45Z", 
123_456_000_000L, "ISO 6 digits"},
+      {"2024-01-15T10:30:45.123456789Z", "2024-01-15T10:30:45Z", 
123_456_789_000L, "ISO 9 digits"},
+      {
+        "2024-01-15T10:30:45.123456789012Z",
+        "2024-01-15T10:30:45Z",
+        123_456_789_012L,
+        "ISO 12 digits (picos)"
+      },
+
+      // Boundary: earliest date (0001-01-01)
+      {"0001-01-01 00:00:00.000000 UTC", "0001-01-01T00:00:00Z", 0L, "Earliest 
UTC"},
+      {"0001-01-01T00:00:00Z", "0001-01-01T00:00:00Z", 0L, "Earliest ISO"},
+      {"0001-01-01T00:00:00.000000000001Z", "0001-01-01T00:00:00Z", 1L, 
"Earliest ISO 1 pico"},
+
+      // Boundary: latest date (9999-12-31)
+      {"9999-12-31 23:59:59.999999 UTC", "9999-12-31T23:59:59Z", 
999_999_000_000L, "Latest UTC"},
+      {
+        "9999-12-31T23:59:59.999999999Z",
+        "9999-12-31T23:59:59Z",
+        999_999_999_000L,
+        "Latest ISO 9 digits"
+      },
+      {
+        "9999-12-31T23:59:59.999999999999Z",
+        "9999-12-31T23:59:59Z",
+        999_999_999_999L,
+        "Latest ISO max picos"
+      },
+
+      // Unix epoch (1970-01-01)
+      {"1970-01-01 00:00:00 UTC", "1970-01-01T00:00:00Z", 0L, "Epoch UTC"},
+      {"1970-01-01T00:00:00Z", "1970-01-01T00:00:00Z", 0L, "Epoch ISO"},
+      {"1970-01-01T00:00:00.000000000001Z", "1970-01-01T00:00:00Z", 1L, "Epoch 
+ 1 pico"},
+
+      // Fractional boundaries
+      {"2024-01-15T10:30:45.000000000000Z", "2024-01-15T10:30:45Z", 0L, "All 
zeros picos"},
+      {
+        "2024-01-15T10:30:45.999999999999Z",
+        "2024-01-15T10:30:45Z",
+        999_999_999_999L,
+        "All nines picos"
+      },
+      {
+        "2024-01-15T10:30:45.1Z",
+        "2024-01-15T10:30:45Z",
+        100_000_000_000L,
+        "Single digit fractional"
+      },
+    };
+
+    for (Object[] testCase : testCases) {
+      String input = (String) testCase[0];
+      String isoEquivalent = (String) testCase[1];
+      long expectedPicos = (Long) testCase[2];
+      String description = (String) testCase[3];
+
+      long expectedSecs = expectedSeconds(isoEquivalent);
+
+      BigQueryUtils.TimestampPicos result = 
BigQueryUtils.TimestampPicos.fromString(input);
+
+      assertEquals(
+          String.format("Seconds mismatch for '%s' (%s)", input, description),
+          expectedSecs,
+          result.seconds);
+      assertEquals(
+          String.format("Picoseconds mismatch for '%s' (%s)", input, 
description),
+          expectedPicos,
+          result.picoseconds);
+    }
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 05f0e9c993c..ea3bb29e081 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -42,6 +42,7 @@ import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Descriptors.DescriptorValidationException;
 import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Int64Value;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
@@ -52,6 +53,7 @@ import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -131,6 +133,11 @@ public class TableRowToStorageApiProtoTest {
                   .add(new 
TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum"))
                   .add(
                       new 
TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("TIMESTAMP")
+                          .setName("timestamppicosvalue")
+                          .setTimestampPrecision(12L))
                   .build());
 
   private static final TableSchema BASE_TABLE_SCHEMA_NO_F =
@@ -183,6 +190,11 @@ public class TableRowToStorageApiProtoTest {
                   .add(new 
TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum"))
                   .add(
                       new 
TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("TIMESTAMP")
+                          .setName("timestamppicosvalue")
+                          .setTimestampPrecision(12L))
                   .build());
 
   private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR =
@@ -396,6 +408,14 @@ public class TableRowToStorageApiProtoTest {
                               AnnotationsProto.columnName.getDescriptor(),
                               "123_illegalprotofieldname"))
                   .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestamppicosvalue")
+                  .setNumber(30)
+                  .setType(Type.TYPE_MESSAGE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .setTypeName("TimestampPicos")
+                  .build())
           .build();
 
   private static final com.google.cloud.bigquery.storage.v1.TableSchema 
BASE_TABLE_PROTO_SCHEMA =
@@ -545,6 +565,12 @@ public class TableRowToStorageApiProtoTest {
                   .setName("123_illegalprotofieldname")
                   
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
                   .build())
+          .addFields(
+              
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                  .setName("timestamppicosvalue")
+                  
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP)
+                  .setTimestampPrecision(Int64Value.newBuilder().setValue(12L))
+                  .build())
           .build();
 
   private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
@@ -751,6 +777,14 @@ public class TableRowToStorageApiProtoTest {
                               AnnotationsProto.columnName.getDescriptor(),
                               "123_illegalprotofieldname"))
                   .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestamppicosvalue")
+                  .setNumber(29)
+                  .setType(Type.TYPE_MESSAGE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .setTypeName("TimestampPicos")
+                  .build())
           .build();
 
   private static final com.google.cloud.bigquery.storage.v1.TableSchema
@@ -896,6 +930,12 @@ public class TableRowToStorageApiProtoTest {
                       .setName("123_illegalprotofieldname")
                       
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING)
                       .build())
+              .addFields(
+                  
com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder()
+                      .setName("timestamppicosvalue")
+                      
.setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP)
+                      
.setTimestampPrecision(Int64Value.newBuilder().setValue(12L))
+                      .build())
               .build();
   private static final TableSchema NESTED_TABLE_SCHEMA =
       new TableSchema()
@@ -1137,6 +1177,34 @@ public class TableRowToStorageApiProtoTest {
     assertEquals(roundTripExpectedBaseTypesNoF, nestedRoundTripTypes);
   }
 
+  private static final DescriptorProto TIMESTAMP_PICOS_PROTO =
+      DescriptorProto.newBuilder()
+          .setName("TimestampPicos")
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("seconds")
+                  .setNumber(1)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL))
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("picoseconds")
+                  .setNumber(2)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL))
+          .build();
+
+  private static final Descriptor TIMESTAMP_PICOS_DESCRIPTOR;
+
+  static {
+    try {
+      TIMESTAMP_PICOS_DESCRIPTOR =
+          TableRowToStorageApiProto.wrapDescriptorProto(TIMESTAMP_PICOS_PROTO);
+    } catch (DescriptorValidationException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static final List<Object> REPEATED_BYTES =
       ImmutableList.of(
           
BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
@@ -1183,7 +1251,8 @@ public class TableRowToStorageApiProtoTest {
                   new TableCell().setV("1970-01-01 00:00:00.1230"),
                   new TableCell().setV("2019-08-16 00:52:07.123456"),
                   new TableCell().setV("9999-12-31 23:59:59.999999Z"),
-                  new TableCell().setV("madeit")));
+                  new TableCell().setV("madeit"),
+                  new TableCell().setV("2024-01-15T10:30:45.123456789012Z")));
 
   private static final TableRow BASE_TABLE_ROW_NO_F =
       new TableRow()
@@ -1217,7 +1286,8 @@ public class TableRowToStorageApiProtoTest {
           .set("timestampvaluespacetrailingzero", "1970-01-01 00:00:00.1230")
           .set("datetimevaluespace", "2019-08-16 00:52:07.123456")
           .set("timestampvaluemaximum", "9999-12-31 23:59:59.999999Z")
-          .set("123_illegalprotofieldname", "madeit");
+          .set("123_illegalprotofieldname", "madeit")
+          .set("timestamppicosvalue", "2024-01-15T10:30:45.123456789012Z");
 
   private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES =
       ImmutableMap.<String, Object>builder()
@@ -1261,6 +1331,15 @@ public class TableRowToStorageApiProtoTest {
           .put(
               
BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname"),
               "madeit")
+          .put(
+              "timestamppicosvalue",
+              DynamicMessage.newBuilder(TIMESTAMP_PICOS_DESCRIPTOR)
+                  .setField(
+                      TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("seconds"),
+                      Instant.parse("2024-01-15T10:30:45Z").getEpochSecond())
+                  .setField(
+                      
TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("picoseconds"), 123456789012L)
+                  .build())
           .build();
 
   private static final Map<String, String> BASE_ROW_EXPECTED_NAME_OVERRIDES =
@@ -1309,6 +1388,15 @@ public class TableRowToStorageApiProtoTest {
           .put(
               
BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname"),
               "madeit")
+          .put(
+              "timestamppicosvalue",
+              DynamicMessage.newBuilder(TIMESTAMP_PICOS_DESCRIPTOR)
+                  .setField(
+                      TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("seconds"),
+                      Instant.parse("2024-01-15T10:30:45Z").getEpochSecond())
+                  .setField(
+                      
TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("picoseconds"), 123456789012L)
+                  .build())
           .build();
 
   private static final Map<String, String> 
BASE_ROW_NO_F_EXPECTED_NAME_OVERRIDES =
@@ -1394,6 +1482,16 @@ public class TableRowToStorageApiProtoTest {
         == com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT) {
       return normalizeTableRow((TableRow) value, schemaInformation, 
outputUsingF);
     } else {
+      if (schemaInformation.getType()
+          == 
com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP) {
+        // Handle picosecond timestamp (12-digit precision)
+        if (schemaInformation.getTimestampPrecision() == 12) {
+          // Already a string, return as-is.
+          if (value instanceof String) {
+            return value;
+          }
+        }
+      }
       convertedValue = 
TYPE_MAP_PROTO_CONVERTERS.get(schemaInformation.getType()).apply("", value);
       switch (schemaInformation.getType()) {
         case BOOL:
@@ -1461,8 +1559,42 @@ public class TableRowToStorageApiProtoTest {
                     entry ->
                         
entry.getKey().getOptions().getExtension(AnnotationsProto.columnName)));
 
-    assertEquals(
-        withF ? BASE_ROW_EXPECTED_PROTO_VALUES : 
BASE_ROW_NO_F_EXPECTED_PROTO_VALUES, recordFields);
+    // Get expected values
+    Map<String, Object> expectedValues =
+        withF ? BASE_ROW_EXPECTED_PROTO_VALUES : 
BASE_ROW_NO_F_EXPECTED_PROTO_VALUES;
+
+    // Handle timestamppicosvalue separately since DynamicMessage doesn't have 
proper equals()
+    Object actualPicos = recordFields.get("timestamppicosvalue");
+    Object expectedPicos = expectedValues.get("timestamppicosvalue");
+
+    if (actualPicos != null && expectedPicos != null) {
+      // Compare DynamicMessages by their field values
+      DynamicMessage actualPicosMsg = (DynamicMessage) actualPicos;
+      DynamicMessage expectedPicosMsg = (DynamicMessage) expectedPicos;
+
+      Descriptor actualDescriptor = actualPicosMsg.getDescriptorForType();
+
+      assertEquals(
+          "TimestampPicos seconds mismatch",
+          expectedPicosMsg.getField(
+              
expectedPicosMsg.getDescriptorForType().findFieldByName("seconds")),
+          
actualPicosMsg.getField(actualDescriptor.findFieldByName("seconds")));
+      assertEquals(
+          "TimestampPicos picoseconds mismatch",
+          expectedPicosMsg.getField(
+              
expectedPicosMsg.getDescriptorForType().findFieldByName("picoseconds")),
+          
actualPicosMsg.getField(actualDescriptor.findFieldByName("picoseconds")));
+    }
+
+    // Remove timestamppicosvalue from both maps for remaining comparison
+    Map<String, Object> recordFieldsWithoutPicos = new HashMap<>(recordFields);
+    Map<String, Object> expectedValuesWithoutPicos = new 
HashMap<>(expectedValues);
+    recordFieldsWithoutPicos.remove("timestamppicosvalue");
+    expectedValuesWithoutPicos.remove("timestamppicosvalue");
+
+    // Compare remaining fields
+    assertEquals(expectedValuesWithoutPicos, recordFieldsWithoutPicos);
+
     assertEquals(
         withF ? BASE_ROW_EXPECTED_NAME_OVERRIDES : 
BASE_ROW_NO_F_EXPECTED_NAME_OVERRIDES,
         overriddenNames);
@@ -1484,6 +1616,7 @@ public class TableRowToStorageApiProtoTest {
     DynamicMessage msg =
         TableRowToStorageApiProto.messageFromTableRow(
             schemaInformation, descriptor, tableRow, false, false, null, null, 
-1);
+
     assertEquals(4, msg.getAllFields().size());
 
     Map<String, FieldDescriptor> fieldDescriptors =
@@ -1511,6 +1644,7 @@ public class TableRowToStorageApiProtoTest {
     DynamicMessage msg =
         TableRowToStorageApiProto.messageFromTableRow(
             schemaInformation, descriptor, tableRow, false, false, null, null, 
-1);
+
     TableRow recovered =
         TableRowToStorageApiProto.tableRowFromMessage(
             schemaInformation, msg, true, Predicates.alwaysTrue());

Reply via email to