This is an automated email from the ASF dual-hosted git repository.

cvandermerwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cc8dc2fe4f2 Add timestamp precision option to bigquery storage read 
for TIMESTAMP(12) columns. (#37079)
cc8dc2fe4f2 is described below

commit cc8dc2fe4f22019c609f4af07d62743380977b07
Author: claudevdm <[email protected]>
AuthorDate: Mon Dec 22 07:36:41 2025 +0200

    Add timestamp precision option to bigquery storage read for TIMESTAMP(12) 
columns. (#37079)
    
    * initial
    
    * fix tests.
    
    * Fix test.
    
    * Make setting name more descriptive.
    
    * Comments.
    
    ---------
    
    Co-authored-by: Claude <[email protected]>
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  32 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTranslation.java |  13 +
 .../gcp/bigquery/BigQueryStorageQuerySource.java   |  43 +-
 .../io/gcp/bigquery/BigQueryStorageSourceBase.java |  67 ++-
 .../gcp/bigquery/BigQueryStorageTableSource.java   |  40 +-
 .../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 644 +++++++++++++++++++++
 .../io/gcp/bigquery/BigQueryIOTranslationTest.java |  22 +
 7 files changed, 847 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 69b9c62ceea..7c0ab785ae7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1251,6 +1251,8 @@ public class BigQueryIO {
       abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);
 
       abstract Builder<T> setProjectionPushdownApplied(boolean 
projectionPushdownApplied);
+
+      abstract Builder<T> 
setDirectReadPicosTimestampPrecision(TimestampPrecision precision);
     }
 
     abstract @Nullable ValueProvider<String> getJsonTableRef();
@@ -1306,6 +1308,8 @@ public class BigQueryIO {
 
     abstract boolean getProjectionPushdownApplied();
 
+    abstract @Nullable TimestampPrecision 
getDirectReadPicosTimestampPrecision();
+
     /**
      * An enumeration type for the priority of a query.
      *
@@ -1381,7 +1385,8 @@ public class BigQueryIO {
           getFormat(),
           getParseFn(),
           outputCoder,
-          getBigQueryServices());
+          getBigQueryServices(),
+          getDirectReadPicosTimestampPrecision());
     }
 
     private static final String QUERY_VALIDATION_FAILURE_ERROR =
@@ -1525,7 +1530,12 @@ public class BigQueryIO {
         if (selectedFields != null && selectedFields.isAccessible()) {
           tableSchema = BigQueryUtils.trimSchema(tableSchema, 
selectedFields.get());
         }
-        beamSchema = BigQueryUtils.fromTableSchema(tableSchema);
+        BigQueryUtils.SchemaConversionOptions.Builder builder =
+            BigQueryUtils.SchemaConversionOptions.builder();
+        if (getDirectReadPicosTimestampPrecision() != null) {
+          
builder.setPicosecondTimestampMapping(getDirectReadPicosTimestampPrecision());
+        }
+        beamSchema = BigQueryUtils.fromTableSchema(tableSchema, 
builder.build());
       }
 
       final Coder<T> coder = inferCoder(p.getCoderRegistry());
@@ -1710,7 +1720,8 @@ public class BigQueryIO {
                           getParseFn(),
                           outputCoder,
                           getBigQueryServices(),
-                          getProjectionPushdownApplied())));
+                          getProjectionPushdownApplied(),
+                          getDirectReadPicosTimestampPrecision())));
           if (beamSchema != null) {
             rows.setSchema(
                 beamSchema,
@@ -1731,7 +1742,8 @@ public class BigQueryIO {
                   getParseFn(),
                   outputCoder,
                   getBigQueryServices(),
-                  getProjectionPushdownApplied());
+                  getProjectionPushdownApplied(),
+                  getDirectReadPicosTimestampPrecision());
           List<? extends BoundedSource<T>> sources;
           try {
             // This splitting logic taken from the SDF implementation of Read
@@ -2293,6 +2305,18 @@ public class BigQueryIO {
       return toBuilder().setMethod(method).build();
     }
 
+    /**
+     * Sets the timestamp precision to request for TIMESTAMP(12) BigQuery 
columns when reading via
+     * the Storage Read API.
+     *
+     * <p>This option only affects precision of TIMESTAMP(12) column reads 
using {@link
+     * Method#DIRECT_READ}. If not set the BQ client will return microsecond 
precision by default.
+     */
+    public TypedRead<T> withDirectReadPicosTimestampPrecision(
+        TimestampPrecision timestampPrecision) {
+      return 
toBuilder().setDirectReadPicosTimestampPrecision(timestampPrecision).build();
+    }
+
     /** See {@link DataFormat}. */
     public TypedRead<T> withFormat(DataFormat format) {
       return toBuilder().setFormat(format).build();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index d519ea4016f..c2e891145ac 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -109,6 +109,7 @@ public class BigQueryIOTranslation {
             .addNullableBooleanField("projection_pushdown_applied")
             .addNullableByteArrayField("bad_record_router")
             .addNullableByteArrayField("bad_record_error_handler")
+            .addNullableByteArrayField("direct_read_picos_timestamp_precision")
             .build();
 
     public static final String BIGQUERY_READ_TRANSFORM_URN =
@@ -195,6 +196,11 @@ public class BigQueryIOTranslation {
       if (transform.getUseAvroLogicalTypes() != null) {
         fieldValues.put("use_avro_logical_types", 
transform.getUseAvroLogicalTypes());
       }
+      if (transform.getDirectReadPicosTimestampPrecision() != null) {
+        fieldValues.put(
+            "direct_read_picos_timestamp_precision",
+            toByteArray(transform.getDirectReadPicosTimestampPrecision()));
+      }
       fieldValues.put("projection_pushdown_applied", 
transform.getProjectionPushdownApplied());
       fieldValues.put("bad_record_router", 
toByteArray(transform.getBadRecordRouter()));
       fieldValues.put(
@@ -293,6 +299,13 @@ public class BigQueryIOTranslation {
         if (formatBytes != null) {
           builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
         }
+        byte[] timestampPrecisionBytes =
+            configRow.getBytes("direct_read_picos_timestamp_precision");
+        if (timestampPrecisionBytes != null) {
+          builder =
+              builder.setDirectReadPicosTimestampPrecision(
+                  (TimestampPrecision) fromByteArray(timestampPrecisionBytes));
+        }
         Collection<String> selectedFields = 
configRow.getArray("selected_fields");
         if (selectedFields != null && !selectedFields.isEmpty()) {
           
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 07c3273c293..064b9bebaf1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -40,6 +40,38 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 /** A {@link org.apache.beam.sdk.io.Source} representing reading the results 
of a query. */
 class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T> {
 
+  public static <T> BigQueryStorageQuerySource<T> create(
+      String stepUuid,
+      ValueProvider<String> queryProvider,
+      Boolean flattenResults,
+      Boolean useLegacySql,
+      QueryPriority priority,
+      @Nullable String location,
+      @Nullable String queryTempDataset,
+      @Nullable String queryTempProject,
+      @Nullable String kmsKey,
+      @Nullable DataFormat format,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      @Nullable TimestampPrecision picosTimestampPrecision) {
+    return new BigQueryStorageQuerySource<>(
+        stepUuid,
+        queryProvider,
+        flattenResults,
+        useLegacySql,
+        priority,
+        location,
+        queryTempDataset,
+        queryTempProject,
+        kmsKey,
+        format,
+        parseFn,
+        outputCoder,
+        bqServices,
+        picosTimestampPrecision);
+  }
+
   public static <T> BigQueryStorageQuerySource<T> create(
       String stepUuid,
       ValueProvider<String> queryProvider,
@@ -67,7 +99,8 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
         format,
         parseFn,
         outputCoder,
-        bqServices);
+        bqServices,
+        /*picosTimestampPrecision=*/ null);
   }
 
   public static <T> BigQueryStorageQuerySource<T> create(
@@ -94,7 +127,8 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
         null,
         parseFn,
         outputCoder,
-        bqServices);
+        bqServices,
+        /*picosTimestampPrecision=*/ null);
   }
 
   private final String stepUuid;
@@ -123,8 +157,9 @@ class BigQueryStorageQuerySource<T> extends 
BigQueryStorageSourceBase<T> {
       @Nullable DataFormat format,
       SerializableFunction<SchemaAndRecord, T> parseFn,
       Coder<T> outputCoder,
-      BigQueryServices bqServices) {
-    super(format, null, null, parseFn, outputCoder, bqServices);
+      BigQueryServices bqServices,
+      @Nullable TimestampPrecision picosTimestampPrecision) {
+    super(format, null, null, parseFn, outputCoder, bqServices, 
picosTimestampPrecision);
     this.stepUuid = checkNotNull(stepUuid, "stepUuid");
     this.queryProvider = checkNotNull(queryProvider, "queryProvider");
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index d0bc655b311..45763c6ac14 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -22,6 +22,8 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions;
+import com.google.cloud.bigquery.storage.v1.AvroSerializationOptions;
 import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
 import com.google.cloud.bigquery.storage.v1.DataFormat;
 import com.google.cloud.bigquery.storage.v1.ReadSession;
@@ -69,6 +71,7 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
   protected final SerializableFunction<SchemaAndRecord, T> parseFn;
   protected final Coder<T> outputCoder;
   protected final BigQueryServices bqServices;
+  private final @Nullable TimestampPrecision picosTimestampPrecision;
 
   BigQueryStorageSourceBase(
       @Nullable DataFormat format,
@@ -76,13 +79,15 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
       @Nullable ValueProvider<String> rowRestrictionProvider,
       SerializableFunction<SchemaAndRecord, T> parseFn,
       Coder<T> outputCoder,
-      BigQueryServices bqServices) {
+      BigQueryServices bqServices,
+      @Nullable TimestampPrecision picosTimestampPrecision) {
     this.format = format;
     this.selectedFieldsProvider = selectedFieldsProvider;
     this.rowRestrictionProvider = rowRestrictionProvider;
     this.parseFn = checkNotNull(parseFn, "parseFn");
     this.outputCoder = checkNotNull(outputCoder, "outputCoder");
     this.bqServices = checkNotNull(bqServices, "bqServices");
+    this.picosTimestampPrecision = picosTimestampPrecision;
   }
 
   /**
@@ -131,11 +136,12 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
     if (rowRestrictionProvider != null && 
rowRestrictionProvider.isAccessible()) {
       tableReadOptionsBuilder.setRowRestriction(rowRestrictionProvider.get());
     }
-    readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
 
     if (format != null) {
       readSessionBuilder.setDataFormat(format);
+      setPicosTimestampPrecision(tableReadOptionsBuilder, format);
     }
+    readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
 
     // Setting the  requested max stream count to 0, implies that the Read API 
backend will select
     // an appropriate number of streams for the Session to produce reasonable 
throughput.
@@ -199,4 +205,61 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
   public BoundedReader<T> createReader(PipelineOptions options) throws 
IOException {
     throw new UnsupportedOperationException("BigQuery storage source must be 
split before reading");
   }
+
+  private void setPicosTimestampPrecision(
+      ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat 
dataFormat) {
+    if (picosTimestampPrecision == null) {
+      return;
+    }
+
+    if (dataFormat == DataFormat.ARROW) {
+      setArrowTimestampPrecision(tableReadOptionsBuilder, 
picosTimestampPrecision);
+    } else if (dataFormat == DataFormat.AVRO) {
+      setAvroTimestampPrecision(tableReadOptionsBuilder, 
picosTimestampPrecision);
+    }
+  }
+
+  private static void setArrowTimestampPrecision(
+      ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
+      TimestampPrecision timestampPrecision) {
+    ArrowSerializationOptions.PicosTimestampPrecision precision;
+    switch (timestampPrecision) {
+      case MICROS:
+        precision = 
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
+        break;
+      case NANOS:
+        precision = 
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
+        break;
+      case PICOS:
+        precision = 
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported timestamp precision for Storage Read API: " + 
timestampPrecision);
+    }
+    tableReadOptionsBuilder.setArrowSerializationOptions(
+        
ArrowSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
+  }
+
+  private static void setAvroTimestampPrecision(
+      ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
+      TimestampPrecision timestampPrecision) {
+    AvroSerializationOptions.PicosTimestampPrecision precision;
+    switch (timestampPrecision) {
+      case MICROS:
+        precision = 
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
+        break;
+      case NANOS:
+        precision = 
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
+        break;
+      case PICOS:
+        precision = 
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported timestamp precision for Storage Read API: " + 
timestampPrecision);
+    }
+    tableReadOptionsBuilder.setAvroSerializationOptions(
+        
AvroSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 909a2551b29..8b7240158dc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -65,7 +65,8 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
         parseFn,
         outputCoder,
         bqServices,
-        projectionPushdownApplied);
+        projectionPushdownApplied,
+        /*picosTimestampPrecision=*/ null);
   }
 
   public static <T> BigQueryStorageTableSource<T> create(
@@ -83,7 +84,30 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
         parseFn,
         outputCoder,
         bqServices,
-        false);
+        /*projectionPushdownApplied=*/ false,
+        /*picosTimestampPrecision=*/ null);
+  }
+
+  public static <T> BigQueryStorageTableSource<T> create(
+      ValueProvider<TableReference> tableRefProvider,
+      DataFormat format,
+      @Nullable ValueProvider<List<String>> selectedFields,
+      @Nullable ValueProvider<String> rowRestriction,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      boolean projectionPushdownApplied,
+      @Nullable TimestampPrecision picosTimestampPrecision) {
+    return new BigQueryStorageTableSource<>(
+        tableRefProvider,
+        format,
+        selectedFields,
+        rowRestriction,
+        parseFn,
+        outputCoder,
+        bqServices,
+        projectionPushdownApplied,
+        picosTimestampPrecision);
   }
 
   private BigQueryStorageTableSource(
@@ -94,8 +118,16 @@ public class BigQueryStorageTableSource<T> extends 
BigQueryStorageSourceBase<T>
       SerializableFunction<SchemaAndRecord, T> parseFn,
       Coder<T> outputCoder,
       BigQueryServices bqServices,
-      boolean projectionPushdownApplied) {
-    super(format, selectedFields, rowRestriction, parseFn, outputCoder, 
bqServices);
+      boolean projectionPushdownApplied,
+      @Nullable TimestampPrecision picosTimestampPrecision) {
+    super(
+        format,
+        selectedFields,
+        rowRestriction,
+        parseFn,
+        outputCoder,
+        bqServices,
+        picosTimestampPrecision);
     this.tableReferenceProvider = checkNotNull(tableRefProvider, 
"tableRefProvider");
     this.projectionPushdownApplied = projectionPushdownApplied;
     cachedTable = new AtomicReference<>();
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 5b9e15f22b9..95f472f5c61 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -27,6 +27,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -69,14 +71,18 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.ipc.WriteChannel;
 import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.util.Text;
 import org.apache.avro.Schema;
@@ -444,6 +450,63 @@ public class BigQueryIOStorageReadTest {
           asList(
               field("name", new ArrowType.Utf8()), field("number", new 
ArrowType.Int(64, true))));
 
+  // --- MICROS ---
+  private static final TableSchema TABLE_SCHEMA_TIMESTAMP =
+      new TableSchema()
+          .setFields(
+              ImmutableList.of(
+                  new TableFieldSchema()
+                      .setName("ts")
+                      .setType("TIMESTAMP")
+                      .setMode("REQUIRED")
+                      .setTimestampPrecision(12L)));
+
+  private static final org.apache.arrow.vector.types.pojo.Schema 
ARROW_SCHEMA_TS_MICROS =
+      new org.apache.arrow.vector.types.pojo.Schema(
+          asList(field("ts", new ArrowType.Timestamp(TimeUnit.MICROSECOND, 
"UTC"))));
+
+  private static final String AVRO_SCHEMA_TS_MICROS_STRING =
+      "{\"namespace\": \"example.avro\","
+          + " \"type\": \"record\","
+          + " \"name\": \"RowRecord\","
+          + " \"fields\": ["
+          + "     {\"name\": \"ts\", \"type\": {\"type\": \"long\", 
\"logicalType\": \"timestamp-micros\"}}"
+          + " ]}";
+
+  private static final Schema AVRO_SCHEMA_TS_MICROS =
+      new Schema.Parser().parse(AVRO_SCHEMA_TS_MICROS_STRING);
+
+  // --- NANOS ---
+  private static final org.apache.arrow.vector.types.pojo.Schema 
ARROW_SCHEMA_TS_NANOS =
+      new org.apache.arrow.vector.types.pojo.Schema(
+          asList(field("ts", new ArrowType.Timestamp(TimeUnit.NANOSECOND, 
"UTC"))));
+
+  private static final String AVRO_SCHEMA_TS_NANOS_STRING =
+      "{\"namespace\": \"example.avro\","
+          + " \"type\": \"record\","
+          + " \"name\": \"RowRecord\","
+          + " \"fields\": ["
+          + "     {\"name\": \"ts\", \"type\": {\"type\": \"long\", 
\"logicalType\": \"timestamp-nanos\"}}"
+          + " ]}";
+
+  private static final Schema AVRO_SCHEMA_TS_NANOS =
+      new Schema.Parser().parse(AVRO_SCHEMA_TS_NANOS_STRING);
+
+  // --- PICOS (string) ---
+  private static final org.apache.arrow.vector.types.pojo.Schema 
ARROW_SCHEMA_TS_PICOS =
+      new org.apache.arrow.vector.types.pojo.Schema(asList(field("ts", new 
ArrowType.Utf8())));
+
+  private static final String AVRO_SCHEMA_TS_PICOS_STRING =
+      "{\"namespace\": \"example.avro\","
+          + " \"type\": \"record\","
+          + " \"name\": \"RowRecord\","
+          + " \"fields\": ["
+          + "     {\"name\": \"ts\", \"type\": \"string\"}"
+          + " ]}";
+
+  private static final Schema AVRO_SCHEMA_TS_PICOS =
+      new Schema.Parser().parse(AVRO_SCHEMA_TS_PICOS_STRING);
+
   private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) 
throws Exception {
     fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
     TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
@@ -2381,6 +2444,587 @@ public class BigQueryIOStorageReadTest {
     assertEquals(new Utf8("A"), rowA.get("name"));
   }
 
+  @Test
+  public void testTimestampPrecisionDefaultValue() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table");
+
+    assertNull(typedRead.getDirectReadPicosTimestampPrecision());
+  }
+
+  @Test
+  public void testwithDirectReadPicosTimestampPrecisionNanos() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .withDirectReadPicosTimestampPrecision(TimestampPrecision.NANOS);
+
+    assertEquals(TimestampPrecision.NANOS, 
typedRead.getDirectReadPicosTimestampPrecision());
+  }
+
+  @Test
+  public void testwithDirectReadPicosTimestampPrecisionPicos() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS);
+
+    assertEquals(TimestampPrecision.PICOS, 
typedRead.getDirectReadPicosTimestampPrecision());
+  }
+
+  @Test
+  public void 
testTableSourceInitialSplit_withDirectReadPicosTimestampPrecisionNanos_Arrow()
+      throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new 
Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setDataFormat(DataFormat.ARROW)
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder()
+                            .setArrowSerializationOptions(
+                                
com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions
+                                    .newBuilder()
+                                    .setPicosTimestampPrecision(
+                                        com.google.cloud.bigquery.storage.v1
+                                            
.ArrowSerializationOptions.PicosTimestampPrecision
+                                            .TIMESTAMP_PRECISION_NANOS))))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession.Builder builder =
+        ReadSession.newBuilder()
+            .setArrowSchema(
+                
ArrowSchema.newBuilder().setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)))
+            .setDataFormat(DataFormat.ARROW);
+    for (int i = 0; i < 10; i++) {
+      builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            DataFormat.ARROW,
+            null, /* selectedFields */
+            null, /* rowRestriction */
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient),
+            false, /* projectionPushdownApplied */
+            TimestampPrecision.NANOS);
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, 
options);
+    assertEquals(10L, sources.size());
+  }
+
+  private org.apache.arrow.vector.types.pojo.Schema 
getArrowSchemaTs(TimestampPrecision precision) {
+    switch (precision) {
+      case NANOS:
+        return ARROW_SCHEMA_TS_NANOS;
+      case PICOS:
+        return ARROW_SCHEMA_TS_PICOS;
+      case MICROS:
+      default:
+        return ARROW_SCHEMA_TS_MICROS;
+    }
+  }
+
+  private Schema getAvroSchemaTs(TimestampPrecision precision) {
+    switch (precision) {
+      case NANOS:
+        return AVRO_SCHEMA_TS_NANOS;
+      case PICOS:
+        return AVRO_SCHEMA_TS_PICOS;
+      case MICROS:
+      default:
+        return AVRO_SCHEMA_TS_MICROS;
+    }
+  }
+
+  private String getAvroSchemaStringTs(TimestampPrecision precision) {
+    switch (precision) {
+      case NANOS:
+        return AVRO_SCHEMA_TS_NANOS_STRING;
+      case PICOS:
+        return AVRO_SCHEMA_TS_PICOS_STRING;
+      case MICROS:
+      default:
+        return AVRO_SCHEMA_TS_MICROS_STRING;
+    }
+  }
+
+  /**
+   * Converts ISO timestamp strings to the appropriate format for the 
precision. - MICROS: Long
+   * (epoch microseconds) - NANOS: Long (epoch nanoseconds) - PICOS: String 
(formatted as
+   * "yyyy-MM-dd HH:mm:ss.SSSSSSSSSSSS UTC")
+   */
+  private List<Object> convertInputsForPrecision(
+      List<String> isoTimestamps, TimestampPrecision precision) {
+    return isoTimestamps.stream()
+        .map(
+            iso -> {
+              if (precision == TimestampPrecision.PICOS) {
+                // For PICOS, input IS the string (already formatted)
+                return iso;
+              }
+              java.time.Instant instant = java.time.Instant.parse(iso);
+              if (precision == TimestampPrecision.NANOS) {
+                return instant.getEpochSecond() * 1_000_000_000L + 
instant.getNano();
+              } else {
+                // MICROS (default)
+                return instant.getEpochSecond() * 1_000_000L + 
instant.getNano() / 1000;
+              }
+            })
+        .collect(Collectors.toList());
+  }
+
+  private ReadSession createTsReadSession(
+      DataFormat dataFormat,
+      org.apache.arrow.vector.types.pojo.Schema arrowSchema,
+      String avroSchemaString) {
+    ReadSession.Builder builder =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .addStreams(ReadStream.newBuilder().setName("streamName"))
+            .setDataFormat(dataFormat);
+
+    if (dataFormat == DataFormat.ARROW) {
+      builder.setArrowSchema(
+          
ArrowSchema.newBuilder().setSerializedSchema(serializeArrowSchema(arrowSchema)).build());
+    } else {
+      
builder.setAvroSchema(AvroSchema.newBuilder().setSchema(avroSchemaString).build());
+    }
+    return builder.build();
+  }
+
+  private ReadRowsResponse createArrowTsResponse(
+      org.apache.arrow.vector.types.pojo.Schema arrowSchema,
+      TimestampPrecision precision,
+      List<Object> inputValues) {
+    ArrowRecordBatch serializedRecord;
+    try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+      schemaRoot.allocateNew();
+      schemaRoot.setRowCount(inputValues.size());
+
+      switch (precision) {
+        case NANOS:
+          TimeStampNanoTZVector nanoVector =
+              (TimeStampNanoTZVector) schemaRoot.getFieldVectors().get(0);
+          for (int i = 0; i < inputValues.size(); i++) {
+            nanoVector.set(i, (Long) inputValues.get(i));
+          }
+          break;
+        case PICOS:
+          VarCharVector stringVector = (VarCharVector) 
schemaRoot.getFieldVectors().get(0);
+          for (int i = 0; i < inputValues.size(); i++) {
+            stringVector.set(i, new Text((String) inputValues.get(i)));
+          }
+          break;
+        case MICROS:
+        default:
+          TimeStampMicroTZVector microVector =
+              (TimeStampMicroTZVector) schemaRoot.getFieldVectors().get(0);
+          for (int i = 0; i < inputValues.size(); i++) {
+            microVector.set(i, (Long) inputValues.get(i));
+          }
+          break;
+      }
+
+      VectorUnloader unLoader = new VectorUnloader(schemaRoot);
+      try (org.apache.arrow.vector.ipc.message.ArrowRecordBatch records =
+          unLoader.getRecordBatch()) {
+        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+          MessageSerializer.serialize(new 
WriteChannel(Channels.newChannel(os)), records);
+          serializedRecord =
+              ArrowRecordBatch.newBuilder()
+                  .setRowCount(records.getLength())
+                  
.setSerializedRecordBatch(ByteString.copyFrom(os.toByteArray()))
+                  .build();
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    return ReadRowsResponse.newBuilder()
+        .setArrowRecordBatch(serializedRecord)
+        .setRowCount(inputValues.size())
+        .setStats(
+            StreamStats.newBuilder()
+                
.setProgress(Progress.newBuilder().setAtResponseStart(0.0).setAtResponseEnd(1.0)))
+        .build();
+  }
+
+  private ReadRowsResponse createAvroTsResponse(
+      Schema avroSchema, TimestampPrecision precision, List<Object> 
inputValues) throws Exception {
+    List<GenericRecord> records = new ArrayList<>();
+    for (Object value : inputValues) {
+      GenericRecord record = new Record(avroSchema);
+      record.put("ts", value);
+      records.add(record);
+    }
+
+    GenericDatumWriter<GenericRecord> writer = new 
GenericDatumWriter<>(avroSchema);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, 
null);
+    for (GenericRecord record : records) {
+      writer.write(record, binaryEncoder);
+    }
+    binaryEncoder.flush();
+
+    return ReadRowsResponse.newBuilder()
+        .setAvroRows(
+            AvroRows.newBuilder()
+                
.setSerializedBinaryRows(ByteString.copyFrom(outputStream.toByteArray()))
+                .setRowCount(records.size()))
+        .setRowCount(records.size())
+        .setStats(
+            StreamStats.newBuilder()
+                
.setProgress(Progress.newBuilder().setAtResponseStart(0.0).setAtResponseEnd(1.0)))
+        .build();
+  }
+
+  private void runTimestampTest(
+      DataFormat dataFormat,
+      TimestampPrecision precision,
+      boolean useSchema,
+      List<String> inputTimestamps,
+      List<String> expectedOutputs)
+      throws Exception {
+
+    TimestampPrecision effectivePrecision =
+        (precision != null) ? precision : TimestampPrecision.MICROS;
+
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table =
+        new 
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA_TIMESTAMP);
+    fakeDatasetService.createTable(table);
+
+    org.apache.arrow.vector.types.pojo.Schema arrowSchema = 
getArrowSchemaTs(effectivePrecision);
+    Schema avroSchema = getAvroSchemaTs(effectivePrecision);
+    String avroSchemaString = getAvroSchemaStringTs(effectivePrecision);
+
+    List<Object> inputValues = convertInputsForPrecision(inputTimestamps, 
effectivePrecision);
+
+    ReadSession readSession = createTsReadSession(dataFormat, arrowSchema, 
avroSchemaString);
+
+    List<ReadRowsResponse> readRowsResponses;
+    if (dataFormat == DataFormat.ARROW) {
+      readRowsResponses =
+          Lists.newArrayList(createArrowTsResponse(arrowSchema, 
effectivePrecision, inputValues));
+    } else {
+      readRowsResponses =
+          Lists.newArrayList(createAvroTsResponse(avroSchema, 
effectivePrecision, inputValues));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, 
withSettings().serializable());
+    
when(fakeStorageClient.createReadSession(any(CreateReadSessionRequest.class)))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(any(ReadRowsRequest.class), eq("")))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+    TypedRead<TableRow> read =
+        useSchema ? BigQueryIO.readTableRowsWithSchema() : 
BigQueryIO.readTableRows();
+
+    read =
+        read.from("foo.com:project:dataset.table")
+            .withMethod(Method.DIRECT_READ)
+            .withFormat(dataFormat)
+            .withTestServices(
+                new FakeBigQueryServices()
+                    .withDatasetService(fakeDatasetService)
+                    .withStorageClient(fakeStorageClient));
+
+    if (precision != null) {
+      read = read.withDirectReadPicosTimestampPrecision(precision);
+    }
+
+    PCollection<TableRow> output = p.apply(read);
+
+    PAssert.that(output)
+        .satisfies(
+            rows -> {
+              List<TableRow> rowList = Lists.newArrayList(rows);
+              assertEquals(expectedOutputs.size(), rowList.size());
+
+              List<String> actualTimestamps =
+                  rowList.stream()
+                      .map(r -> (String) r.get("ts"))
+                      .sorted()
+                      .collect(Collectors.toList());
+
+              List<String> sortedExpected =
+                  
expectedOutputs.stream().sorted().collect(Collectors.toList());
+
+              assertEquals(sortedExpected, actualTimestamps);
+              return null;
+            });
+
+    p.run();
+  }
+
+  // ===== Avro + readTableRows =====
+
+  @Test
+  public void testReadTableRows_Avro_DefaultPrecision() throws Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        null,
+        false,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 
12:30:45.987654 UTC"));
+  }
+
+  @Test
+  public void testReadTableRows_Avro_MicrosPrecision() throws Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        TimestampPrecision.MICROS,
+        false,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 
12:30:45.987654 UTC"));
+  }
+
+  @Test
+  public void testReadTableRows_Avro_NanosPrecision() throws Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        TimestampPrecision.NANOS,
+        false,
+        Arrays.asList("2024-01-01T00:00:00.123456789Z", 
"2024-06-15T12:30:45.987654321Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 
12:30:45.987654321 UTC"));
+  }
+
+  @Test
+  public void testReadTableRows_Avro_PicosPrecision() throws Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        TimestampPrecision.PICOS,
+        false,
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"),
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"));
+  }
+
+  // ===== Avro + readTableRowsWithSchema =====
+
+  @Test
+  public void testReadTableRowsWithSchema_Avro_DefaultPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        null,
+        true,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 
12:30:45.987654 UTC"));
+  }
+
+  @Test
+  public void testReadTableRowsWithSchema_Avro_MicrosPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        TimestampPrecision.MICROS,
+        true,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15 
12:30:45.987654 UTC"));
+  }
+
+  @Test
+  public void testReadTableRowsWithSchema_Avro_NanosPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        TimestampPrecision.NANOS,
+        true,
+        Arrays.asList("2024-01-01T00:00:00.123456789Z", 
"2024-06-15T12:30:45.987654321Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 
12:30:45.987654321 UTC"));
+  }
+
+  @Test
+  public void testReadTableRowsWithSchema_Avro_PicosPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.AVRO,
+        TimestampPrecision.PICOS,
+        true,
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"),
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"));
+  }
+
+  // ===== Arrow + readTableRows =====
+
+  @Test
+  public void testReadTableRows_Arrow_DefaultPrecision() throws Exception {
+    //    Avro records are always converted to beam Row and then to 
GenericRecord in
+    // ArrowConversion.java
+    //    ArrowConversion.java is a generic utility to convert Arrow records 
and it does not take
+    // into account
+    //    the BigQuery TableSchema to determine the appropriate beam type. 
Historically arrow
+    // microsecond timestamps
+    //    are converted to FieldType.DATETIME, which maps to joda Instants, 
which only supports up
+    // to millisecond precision
+    //    hence precision is lost.
+    runTimestampTest(
+        DataFormat.ARROW,
+        null,
+        false,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 
UTC"));
+  }
+
+  @Test
+  public void testReadTableRows_Arrow_MicrosPrecision() throws Exception {
+    runTimestampTest(
+        DataFormat.ARROW,
+        TimestampPrecision.MICROS,
+        false,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 
UTC"));
+  }
+
+  @Test
+  public void testReadTableRows_Arrow_NanosPrecision() throws Exception {
+    runTimestampTest(
+        DataFormat.ARROW,
+        TimestampPrecision.NANOS,
+        false,
+        Arrays.asList("2024-01-01T00:00:00.123456789Z", 
"2024-06-15T12:30:45.987654321Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 
12:30:45.987654321 UTC"));
+  }
+
+  @Test
+  public void testReadTableRows_Arrow_PicosPrecision() throws Exception {
+    runTimestampTest(
+        DataFormat.ARROW,
+        TimestampPrecision.PICOS,
+        false,
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"),
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"));
+  }
+
+  // ===== Arrow + readTableRowsWithSchema =====
+
+  @Test
+  public void testReadTableRowsWithSchema_Arrow_DefaultPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.ARROW,
+        null,
+        true,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 
UTC"));
+  }
+
+  @Test
+  public void testReadTableRowsWithSchema_Arrow_MicrosPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.ARROW,
+        TimestampPrecision.MICROS,
+        true,
+        Arrays.asList("2024-01-01T00:00:00.123456Z", 
"2024-06-15T12:30:45.987654Z"),
+        Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987 
UTC"));
+  }
+
+  @Test
+  public void testReadTableRowsWithSchema_Arrow_NanosPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.ARROW,
+        TimestampPrecision.NANOS,
+        true,
+        Arrays.asList("2024-01-01T00:00:00.123456789Z", 
"2024-06-15T12:30:45.987654321Z"),
+        Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15 
12:30:45.987654321 UTC"));
+  }
+
+  @Test
+  public void testReadTableRowsWithSchema_Arrow_PicosPrecision() throws 
Exception {
+    runTimestampTest(
+        DataFormat.ARROW,
+        TimestampPrecision.PICOS,
+        true,
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"),
+        Arrays.asList(
+            "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15 
12:30:45.987654321098 UTC"));
+  }
+
+  @Test
+  public void 
testTableSourceInitialSplit_withDirectReadPicosTimestampPrecisionNanos_Avro()
+      throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new 
Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    // Expected request should include AvroSerializationOptions with NANOS 
precision
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setDataFormat(DataFormat.AVRO)
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder()
+                            .setAvroSerializationOptions(
+                                
com.google.cloud.bigquery.storage.v1.AvroSerializationOptions
+                                    .newBuilder()
+                                    .setPicosTimestampPrecision(
+                                        com.google.cloud.bigquery.storage.v1
+                                            
.AvroSerializationOptions.PicosTimestampPrecision
+                                            .TIMESTAMP_PRECISION_NANOS))))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession.Builder builder =
+        ReadSession.newBuilder()
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .setDataFormat(DataFormat.AVRO);
+    for (int i = 0; i < 10; i++) {
+      builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            DataFormat.AVRO,
+            null, /* selectedFields */
+            null, /* rowRestriction */
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient),
+            false, /* projectionPushdownApplied */
+            TimestampPrecision.NANOS);
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, 
options);
+    assertEquals(10L, sources.size());
+  }
+
   private static org.apache.arrow.vector.types.pojo.Field field(
       String name,
       boolean nullable,
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index 5b7b5d47319..de63120c93c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -63,6 +63,8 @@ public class BigQueryIOTranslationTest {
     READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempProject", 
"query_temp_project");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getMethod", "method");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getFormat", "format");
+    READ_TRANSFORM_SCHEMA_MAPPING.put(
+        "getDirectReadPicosTimestampPrecision", 
"direct_read_picos_timestamp_precision");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getSelectedFields", "selected_fields");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getRowRestriction", "row_restriction");
     READ_TRANSFORM_SCHEMA_MAPPING.put("getCoder", "coder");
@@ -323,4 +325,24 @@ public class BigQueryIOTranslationTest {
                       .contains(fieldName));
             });
   }
+
+  @Test
+  public void 
testReCreateReadTransformFromRowWithDirectReadPicosTimestampPrecision() {
+    BigQueryIO.TypedRead<TableRow> readTransform =
+        BigQueryIO.readTableRows()
+            .from("dummyproject:dummydataset.dummytable")
+            .withMethod(TypedRead.Method.DIRECT_READ)
+            .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS);
+
+    BigQueryIOTranslation.BigQueryIOReadTranslator translator =
+        new BigQueryIOTranslation.BigQueryIOReadTranslator();
+    Row row = translator.toConfigRow(readTransform);
+
+    BigQueryIO.TypedRead<TableRow> readTransformFromRow =
+        (BigQueryIO.TypedRead<TableRow>)
+            translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+    assertEquals(
+        TimestampPrecision.PICOS, 
readTransformFromRow.getDirectReadPicosTimestampPrecision());
+  }
 }

Reply via email to