This is an automated email from the ASF dual-hosted git repository.
cvandermerwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cc8dc2fe4f2 Add timestamp precision option to bigquery storage read
for TIMESTAMP(12) columns. (#37079)
cc8dc2fe4f2 is described below
commit cc8dc2fe4f22019c609f4af07d62743380977b07
Author: claudevdm <[email protected]>
AuthorDate: Mon Dec 22 07:36:41 2025 +0200
Add timestamp precision option to bigquery storage read for TIMESTAMP(12)
columns. (#37079)
* initial
* fix tests.
* Fix test.
* Make setting name more descriptive.
* Comments.
---------
Co-authored-by: Claude <[email protected]>
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 32 +-
.../sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 13 +
.../gcp/bigquery/BigQueryStorageQuerySource.java | 43 +-
.../io/gcp/bigquery/BigQueryStorageSourceBase.java | 67 ++-
.../gcp/bigquery/BigQueryStorageTableSource.java | 40 +-
.../io/gcp/bigquery/BigQueryIOStorageReadTest.java | 644 +++++++++++++++++++++
.../io/gcp/bigquery/BigQueryIOTranslationTest.java | 22 +
7 files changed, 847 insertions(+), 14 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 69b9c62ceea..7c0ab785ae7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1251,6 +1251,8 @@ public class BigQueryIO {
abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);
abstract Builder<T> setProjectionPushdownApplied(boolean
projectionPushdownApplied);
+
+ abstract Builder<T>
setDirectReadPicosTimestampPrecision(TimestampPrecision precision);
}
abstract @Nullable ValueProvider<String> getJsonTableRef();
@@ -1306,6 +1308,8 @@ public class BigQueryIO {
abstract boolean getProjectionPushdownApplied();
+ abstract @Nullable TimestampPrecision
getDirectReadPicosTimestampPrecision();
+
/**
* An enumeration type for the priority of a query.
*
@@ -1381,7 +1385,8 @@ public class BigQueryIO {
getFormat(),
getParseFn(),
outputCoder,
- getBigQueryServices());
+ getBigQueryServices(),
+ getDirectReadPicosTimestampPrecision());
}
private static final String QUERY_VALIDATION_FAILURE_ERROR =
@@ -1525,7 +1530,12 @@ public class BigQueryIO {
if (selectedFields != null && selectedFields.isAccessible()) {
tableSchema = BigQueryUtils.trimSchema(tableSchema,
selectedFields.get());
}
- beamSchema = BigQueryUtils.fromTableSchema(tableSchema);
+ BigQueryUtils.SchemaConversionOptions.Builder builder =
+ BigQueryUtils.SchemaConversionOptions.builder();
+ if (getDirectReadPicosTimestampPrecision() != null) {
+
builder.setPicosecondTimestampMapping(getDirectReadPicosTimestampPrecision());
+ }
+ beamSchema = BigQueryUtils.fromTableSchema(tableSchema,
builder.build());
}
final Coder<T> coder = inferCoder(p.getCoderRegistry());
@@ -1710,7 +1720,8 @@ public class BigQueryIO {
getParseFn(),
outputCoder,
getBigQueryServices(),
- getProjectionPushdownApplied())));
+ getProjectionPushdownApplied(),
+ getDirectReadPicosTimestampPrecision())));
if (beamSchema != null) {
rows.setSchema(
beamSchema,
@@ -1731,7 +1742,8 @@ public class BigQueryIO {
getParseFn(),
outputCoder,
getBigQueryServices(),
- getProjectionPushdownApplied());
+ getProjectionPushdownApplied(),
+ getDirectReadPicosTimestampPrecision());
List<? extends BoundedSource<T>> sources;
try {
// This splitting logic taken from the SDF implementation of Read
@@ -2293,6 +2305,18 @@ public class BigQueryIO {
return toBuilder().setMethod(method).build();
}
+ /**
+ * Sets the timestamp precision to request for TIMESTAMP(12) BigQuery
columns when reading via
+ * the Storage Read API.
+ *
+ * <p>This option only affects precision of TIMESTAMP(12) column reads
using {@link
+ * Method#DIRECT_READ}. If not set the BQ client will return microsecond
precision by default.
+ */
+ public TypedRead<T> withDirectReadPicosTimestampPrecision(
+ TimestampPrecision timestampPrecision) {
+ return
toBuilder().setDirectReadPicosTimestampPrecision(timestampPrecision).build();
+ }
+
/** See {@link DataFormat}. */
public TypedRead<T> withFormat(DataFormat format) {
return toBuilder().setFormat(format).build();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
index d519ea4016f..c2e891145ac 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java
@@ -109,6 +109,7 @@ public class BigQueryIOTranslation {
.addNullableBooleanField("projection_pushdown_applied")
.addNullableByteArrayField("bad_record_router")
.addNullableByteArrayField("bad_record_error_handler")
+ .addNullableByteArrayField("direct_read_picos_timestamp_precision")
.build();
public static final String BIGQUERY_READ_TRANSFORM_URN =
@@ -195,6 +196,11 @@ public class BigQueryIOTranslation {
if (transform.getUseAvroLogicalTypes() != null) {
fieldValues.put("use_avro_logical_types",
transform.getUseAvroLogicalTypes());
}
+ if (transform.getDirectReadPicosTimestampPrecision() != null) {
+ fieldValues.put(
+ "direct_read_picos_timestamp_precision",
+ toByteArray(transform.getDirectReadPicosTimestampPrecision()));
+ }
fieldValues.put("projection_pushdown_applied",
transform.getProjectionPushdownApplied());
fieldValues.put("bad_record_router",
toByteArray(transform.getBadRecordRouter()));
fieldValues.put(
@@ -293,6 +299,13 @@ public class BigQueryIOTranslation {
if (formatBytes != null) {
builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
}
+ byte[] timestampPrecisionBytes =
+ configRow.getBytes("direct_read_picos_timestamp_precision");
+ if (timestampPrecisionBytes != null) {
+ builder =
+ builder.setDirectReadPicosTimestampPrecision(
+ (TimestampPrecision) fromByteArray(timestampPrecisionBytes));
+ }
Collection<String> selectedFields =
configRow.getArray("selected_fields");
if (selectedFields != null && !selectedFields.isEmpty()) {
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 07c3273c293..064b9bebaf1 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -40,6 +40,38 @@ import org.checkerframework.checker.nullness.qual.Nullable;
/** A {@link org.apache.beam.sdk.io.Source} representing reading the results
of a query. */
class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T> {
+ public static <T> BigQueryStorageQuerySource<T> create(
+ String stepUuid,
+ ValueProvider<String> queryProvider,
+ Boolean flattenResults,
+ Boolean useLegacySql,
+ QueryPriority priority,
+ @Nullable String location,
+ @Nullable String queryTempDataset,
+ @Nullable String queryTempProject,
+ @Nullable String kmsKey,
+ @Nullable DataFormat format,
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ Coder<T> outputCoder,
+ BigQueryServices bqServices,
+ @Nullable TimestampPrecision picosTimestampPrecision) {
+ return new BigQueryStorageQuerySource<>(
+ stepUuid,
+ queryProvider,
+ flattenResults,
+ useLegacySql,
+ priority,
+ location,
+ queryTempDataset,
+ queryTempProject,
+ kmsKey,
+ format,
+ parseFn,
+ outputCoder,
+ bqServices,
+ picosTimestampPrecision);
+ }
+
public static <T> BigQueryStorageQuerySource<T> create(
String stepUuid,
ValueProvider<String> queryProvider,
@@ -67,7 +99,8 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
format,
parseFn,
outputCoder,
- bqServices);
+ bqServices,
+ /*picosTimestampPrecision=*/ null);
}
public static <T> BigQueryStorageQuerySource<T> create(
@@ -94,7 +127,8 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
null,
parseFn,
outputCoder,
- bqServices);
+ bqServices,
+ /*picosTimestampPrecision=*/ null);
}
private final String stepUuid;
@@ -123,8 +157,9 @@ class BigQueryStorageQuerySource<T> extends
BigQueryStorageSourceBase<T> {
@Nullable DataFormat format,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
- BigQueryServices bqServices) {
- super(format, null, null, parseFn, outputCoder, bqServices);
+ BigQueryServices bqServices,
+ @Nullable TimestampPrecision picosTimestampPrecision) {
+ super(format, null, null, parseFn, outputCoder, bqServices,
picosTimestampPrecision);
this.stepUuid = checkNotNull(stepUuid, "stepUuid");
this.queryProvider = checkNotNull(queryProvider, "queryProvider");
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index d0bc655b311..45763c6ac14 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -22,6 +22,8 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions;
+import com.google.cloud.bigquery.storage.v1.AvroSerializationOptions;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadSession;
@@ -69,6 +71,7 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
protected final SerializableFunction<SchemaAndRecord, T> parseFn;
protected final Coder<T> outputCoder;
protected final BigQueryServices bqServices;
+ private final @Nullable TimestampPrecision picosTimestampPrecision;
BigQueryStorageSourceBase(
@Nullable DataFormat format,
@@ -76,13 +79,15 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
@Nullable ValueProvider<String> rowRestrictionProvider,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
- BigQueryServices bqServices) {
+ BigQueryServices bqServices,
+ @Nullable TimestampPrecision picosTimestampPrecision) {
this.format = format;
this.selectedFieldsProvider = selectedFieldsProvider;
this.rowRestrictionProvider = rowRestrictionProvider;
this.parseFn = checkNotNull(parseFn, "parseFn");
this.outputCoder = checkNotNull(outputCoder, "outputCoder");
this.bqServices = checkNotNull(bqServices, "bqServices");
+ this.picosTimestampPrecision = picosTimestampPrecision;
}
/**
@@ -131,11 +136,12 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
if (rowRestrictionProvider != null &&
rowRestrictionProvider.isAccessible()) {
tableReadOptionsBuilder.setRowRestriction(rowRestrictionProvider.get());
}
- readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
if (format != null) {
readSessionBuilder.setDataFormat(format);
+ setPicosTimestampPrecision(tableReadOptionsBuilder, format);
}
+ readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
// Setting the requested max stream count to 0, implies that the Read API
backend will select
// an appropriate number of streams for the Session to produce reasonable
throughput.
@@ -199,4 +205,61 @@ abstract class BigQueryStorageSourceBase<T> extends
BoundedSource<T> {
public BoundedReader<T> createReader(PipelineOptions options) throws
IOException {
throw new UnsupportedOperationException("BigQuery storage source must be
split before reading");
}
+
+ private void setPicosTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat
dataFormat) {
+ if (picosTimestampPrecision == null) {
+ return;
+ }
+
+ if (dataFormat == DataFormat.ARROW) {
+ setArrowTimestampPrecision(tableReadOptionsBuilder,
picosTimestampPrecision);
+ } else if (dataFormat == DataFormat.AVRO) {
+ setAvroTimestampPrecision(tableReadOptionsBuilder,
picosTimestampPrecision);
+ }
+ }
+
+ private static void setArrowTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
+ TimestampPrecision timestampPrecision) {
+ ArrowSerializationOptions.PicosTimestampPrecision precision;
+ switch (timestampPrecision) {
+ case MICROS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
+ break;
+ case NANOS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
+ break;
+ case PICOS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported timestamp precision for Storage Read API: " +
timestampPrecision);
+ }
+ tableReadOptionsBuilder.setArrowSerializationOptions(
+
ArrowSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
+ }
+
+ private static void setAvroTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
+ TimestampPrecision timestampPrecision) {
+ AvroSerializationOptions.PicosTimestampPrecision precision;
+ switch (timestampPrecision) {
+ case MICROS:
+ precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
+ break;
+ case NANOS:
+ precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
+ break;
+ case PICOS:
+ precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported timestamp precision for Storage Read API: " +
timestampPrecision);
+ }
+ tableReadOptionsBuilder.setAvroSerializationOptions(
+
AvroSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 909a2551b29..8b7240158dc 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -65,7 +65,8 @@ public class BigQueryStorageTableSource<T> extends
BigQueryStorageSourceBase<T>
parseFn,
outputCoder,
bqServices,
- projectionPushdownApplied);
+ projectionPushdownApplied,
+ /*picosTimestampPrecision=*/ null);
}
public static <T> BigQueryStorageTableSource<T> create(
@@ -83,7 +84,30 @@ public class BigQueryStorageTableSource<T> extends
BigQueryStorageSourceBase<T>
parseFn,
outputCoder,
bqServices,
- false);
+ /*projectionPushdownApplied=*/ false,
+ /*picosTimestampPrecision=*/ null);
+ }
+
+ public static <T> BigQueryStorageTableSource<T> create(
+ ValueProvider<TableReference> tableRefProvider,
+ DataFormat format,
+ @Nullable ValueProvider<List<String>> selectedFields,
+ @Nullable ValueProvider<String> rowRestriction,
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ Coder<T> outputCoder,
+ BigQueryServices bqServices,
+ boolean projectionPushdownApplied,
+ @Nullable TimestampPrecision picosTimestampPrecision) {
+ return new BigQueryStorageTableSource<>(
+ tableRefProvider,
+ format,
+ selectedFields,
+ rowRestriction,
+ parseFn,
+ outputCoder,
+ bqServices,
+ projectionPushdownApplied,
+ picosTimestampPrecision);
}
private BigQueryStorageTableSource(
@@ -94,8 +118,16 @@ public class BigQueryStorageTableSource<T> extends
BigQueryStorageSourceBase<T>
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices,
- boolean projectionPushdownApplied) {
- super(format, selectedFields, rowRestriction, parseFn, outputCoder,
bqServices);
+ boolean projectionPushdownApplied,
+ @Nullable TimestampPrecision picosTimestampPrecision) {
+ super(
+ format,
+ selectedFields,
+ rowRestriction,
+ parseFn,
+ outputCoder,
+ bqServices,
+ picosTimestampPrecision);
this.tableReferenceProvider = checkNotNull(tableRefProvider,
"tableRefProvider");
this.projectionPushdownApplied = projectionPushdownApplied;
cachedTable = new AtomicReference<>();
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index 5b9e15f22b9..95f472f5c61 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -27,6 +27,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -69,14 +71,18 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.util.Text;
import org.apache.avro.Schema;
@@ -444,6 +450,63 @@ public class BigQueryIOStorageReadTest {
asList(
field("name", new ArrowType.Utf8()), field("number", new
ArrowType.Int(64, true))));
+ // --- MICROS ---
+ private static final TableSchema TABLE_SCHEMA_TIMESTAMP =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema()
+ .setName("ts")
+ .setType("TIMESTAMP")
+ .setMode("REQUIRED")
+ .setTimestampPrecision(12L)));
+
+ private static final org.apache.arrow.vector.types.pojo.Schema
ARROW_SCHEMA_TS_MICROS =
+ new org.apache.arrow.vector.types.pojo.Schema(
+ asList(field("ts", new ArrowType.Timestamp(TimeUnit.MICROSECOND,
"UTC"))));
+
+ private static final String AVRO_SCHEMA_TS_MICROS_STRING =
+ "{\"namespace\": \"example.avro\","
+ + " \"type\": \"record\","
+ + " \"name\": \"RowRecord\","
+ + " \"fields\": ["
+ + " {\"name\": \"ts\", \"type\": {\"type\": \"long\",
\"logicalType\": \"timestamp-micros\"}}"
+ + " ]}";
+
+ private static final Schema AVRO_SCHEMA_TS_MICROS =
+ new Schema.Parser().parse(AVRO_SCHEMA_TS_MICROS_STRING);
+
+ // --- NANOS ---
+ private static final org.apache.arrow.vector.types.pojo.Schema
ARROW_SCHEMA_TS_NANOS =
+ new org.apache.arrow.vector.types.pojo.Schema(
+ asList(field("ts", new ArrowType.Timestamp(TimeUnit.NANOSECOND,
"UTC"))));
+
+ private static final String AVRO_SCHEMA_TS_NANOS_STRING =
+ "{\"namespace\": \"example.avro\","
+ + " \"type\": \"record\","
+ + " \"name\": \"RowRecord\","
+ + " \"fields\": ["
+ + " {\"name\": \"ts\", \"type\": {\"type\": \"long\",
\"logicalType\": \"timestamp-nanos\"}}"
+ + " ]}";
+
+ private static final Schema AVRO_SCHEMA_TS_NANOS =
+ new Schema.Parser().parse(AVRO_SCHEMA_TS_NANOS_STRING);
+
+ // --- PICOS (string) ---
+ private static final org.apache.arrow.vector.types.pojo.Schema
ARROW_SCHEMA_TS_PICOS =
+ new org.apache.arrow.vector.types.pojo.Schema(asList(field("ts", new
ArrowType.Utf8())));
+
+ private static final String AVRO_SCHEMA_TS_PICOS_STRING =
+ "{\"namespace\": \"example.avro\","
+ + " \"type\": \"record\","
+ + " \"name\": \"RowRecord\","
+ + " \"fields\": ["
+ + " {\"name\": \"ts\", \"type\": \"string\"}"
+ + " ]}";
+
+ private static final Schema AVRO_SCHEMA_TS_PICOS =
+ new Schema.Parser().parse(AVRO_SCHEMA_TS_PICOS_STRING);
+
private void doTableSourceInitialSplitTest(long bundleSize, int streamCount)
throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "",
null);
TableReference tableRef =
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
@@ -2381,6 +2444,587 @@ public class BigQueryIOStorageReadTest {
assertEquals(new Utf8("A"), rowA.get("name"));
}
+ @Test
+ public void testTimestampPrecisionDefaultValue() {
+ BigQueryIO.TypedRead<TableRow> typedRead =
+ BigQueryIO.read(new TableRowParser())
+ .withCoder(TableRowJsonCoder.of())
+ .withMethod(Method.DIRECT_READ)
+ .from("foo.com:project:dataset.table");
+
+ assertNull(typedRead.getDirectReadPicosTimestampPrecision());
+ }
+
+ @Test
+ public void testwithDirectReadPicosTimestampPrecisionNanos() {
+ BigQueryIO.TypedRead<TableRow> typedRead =
+ BigQueryIO.read(new TableRowParser())
+ .withCoder(TableRowJsonCoder.of())
+ .withMethod(Method.DIRECT_READ)
+ .from("foo.com:project:dataset.table")
+ .withDirectReadPicosTimestampPrecision(TimestampPrecision.NANOS);
+
+ assertEquals(TimestampPrecision.NANOS,
typedRead.getDirectReadPicosTimestampPrecision());
+ }
+
+ @Test
+ public void testwithDirectReadPicosTimestampPrecisionPicos() {
+ BigQueryIO.TypedRead<TableRow> typedRead =
+ BigQueryIO.read(new TableRowParser())
+ .withCoder(TableRowJsonCoder.of())
+ .withMethod(Method.DIRECT_READ)
+ .from("foo.com:project:dataset.table")
+ .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS);
+
+ assertEquals(TimestampPrecision.PICOS,
typedRead.getDirectReadPicosTimestampPrecision());
+ }
+
+ @Test
+ public void
testTableSourceInitialSplit_withDirectReadPicosTimestampPrecisionNanos_Arrow()
+ throws Exception {
+ fakeDatasetService.createDataset("foo.com:project", "dataset", "", "",
null);
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+ Table table = new
Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA);
+ fakeDatasetService.createTable(table);
+
+ CreateReadSessionRequest expectedRequest =
+ CreateReadSessionRequest.newBuilder()
+ .setParent("projects/project-id")
+ .setReadSession(
+ ReadSession.newBuilder()
+
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+ .setDataFormat(DataFormat.ARROW)
+ .setReadOptions(
+ ReadSession.TableReadOptions.newBuilder()
+ .setArrowSerializationOptions(
+
com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions
+ .newBuilder()
+ .setPicosTimestampPrecision(
+ com.google.cloud.bigquery.storage.v1
+
.ArrowSerializationOptions.PicosTimestampPrecision
+ .TIMESTAMP_PRECISION_NANOS))))
+ .setMaxStreamCount(10)
+ .build();
+
+ ReadSession.Builder builder =
+ ReadSession.newBuilder()
+ .setArrowSchema(
+
ArrowSchema.newBuilder().setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)))
+ .setDataFormat(DataFormat.ARROW);
+ for (int i = 0; i < 10; i++) {
+ builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+ }
+
+ StorageClient fakeStorageClient = mock(StorageClient.class);
+
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+ BigQueryStorageTableSource<TableRow> tableSource =
+ BigQueryStorageTableSource.create(
+ ValueProvider.StaticValueProvider.of(tableRef),
+ DataFormat.ARROW,
+ null, /* selectedFields */
+ null, /* rowRestriction */
+ new TableRowParser(),
+ TableRowJsonCoder.of(),
+ new FakeBigQueryServices()
+ .withDatasetService(fakeDatasetService)
+ .withStorageClient(fakeStorageClient),
+ false, /* projectionPushdownApplied */
+ TimestampPrecision.NANOS);
+
+ List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L,
options);
+ assertEquals(10L, sources.size());
+ }
+
+ private org.apache.arrow.vector.types.pojo.Schema
getArrowSchemaTs(TimestampPrecision precision) {
+ switch (precision) {
+ case NANOS:
+ return ARROW_SCHEMA_TS_NANOS;
+ case PICOS:
+ return ARROW_SCHEMA_TS_PICOS;
+ case MICROS:
+ default:
+ return ARROW_SCHEMA_TS_MICROS;
+ }
+ }
+
+ private Schema getAvroSchemaTs(TimestampPrecision precision) {
+ switch (precision) {
+ case NANOS:
+ return AVRO_SCHEMA_TS_NANOS;
+ case PICOS:
+ return AVRO_SCHEMA_TS_PICOS;
+ case MICROS:
+ default:
+ return AVRO_SCHEMA_TS_MICROS;
+ }
+ }
+
+ private String getAvroSchemaStringTs(TimestampPrecision precision) {
+ switch (precision) {
+ case NANOS:
+ return AVRO_SCHEMA_TS_NANOS_STRING;
+ case PICOS:
+ return AVRO_SCHEMA_TS_PICOS_STRING;
+ case MICROS:
+ default:
+ return AVRO_SCHEMA_TS_MICROS_STRING;
+ }
+ }
+
+ /**
+ * Converts ISO timestamp strings to the appropriate format for the
precision. - MICROS: Long
+ * (epoch microseconds) - NANOS: Long (epoch nanoseconds) - PICOS: String
(formatted as
+ * "yyyy-MM-dd HH:mm:ss.SSSSSSSSSSSS UTC")
+ */
+ private List<Object> convertInputsForPrecision(
+ List<String> isoTimestamps, TimestampPrecision precision) {
+ return isoTimestamps.stream()
+ .map(
+ iso -> {
+ if (precision == TimestampPrecision.PICOS) {
+ // For PICOS, input IS the string (already formatted)
+ return iso;
+ }
+ java.time.Instant instant = java.time.Instant.parse(iso);
+ if (precision == TimestampPrecision.NANOS) {
+ return instant.getEpochSecond() * 1_000_000_000L +
instant.getNano();
+ } else {
+ // MICROS (default)
+ return instant.getEpochSecond() * 1_000_000L +
instant.getNano() / 1000;
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private ReadSession createTsReadSession(
+ DataFormat dataFormat,
+ org.apache.arrow.vector.types.pojo.Schema arrowSchema,
+ String avroSchemaString) {
+ ReadSession.Builder builder =
+ ReadSession.newBuilder()
+ .setName("readSessionName")
+ .addStreams(ReadStream.newBuilder().setName("streamName"))
+ .setDataFormat(dataFormat);
+
+ if (dataFormat == DataFormat.ARROW) {
+ builder.setArrowSchema(
+
ArrowSchema.newBuilder().setSerializedSchema(serializeArrowSchema(arrowSchema)).build());
+ } else {
+
builder.setAvroSchema(AvroSchema.newBuilder().setSchema(avroSchemaString).build());
+ }
+ return builder.build();
+ }
+
+ private ReadRowsResponse createArrowTsResponse(
+ org.apache.arrow.vector.types.pojo.Schema arrowSchema,
+ TimestampPrecision precision,
+ List<Object> inputValues) {
+ ArrowRecordBatch serializedRecord;
+ try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(arrowSchema,
allocator)) {
+ schemaRoot.allocateNew();
+ schemaRoot.setRowCount(inputValues.size());
+
+ switch (precision) {
+ case NANOS:
+ TimeStampNanoTZVector nanoVector =
+ (TimeStampNanoTZVector) schemaRoot.getFieldVectors().get(0);
+ for (int i = 0; i < inputValues.size(); i++) {
+ nanoVector.set(i, (Long) inputValues.get(i));
+ }
+ break;
+ case PICOS:
+ VarCharVector stringVector = (VarCharVector)
schemaRoot.getFieldVectors().get(0);
+ for (int i = 0; i < inputValues.size(); i++) {
+ stringVector.set(i, new Text((String) inputValues.get(i)));
+ }
+ break;
+ case MICROS:
+ default:
+ TimeStampMicroTZVector microVector =
+ (TimeStampMicroTZVector) schemaRoot.getFieldVectors().get(0);
+ for (int i = 0; i < inputValues.size(); i++) {
+ microVector.set(i, (Long) inputValues.get(i));
+ }
+ break;
+ }
+
+ VectorUnloader unLoader = new VectorUnloader(schemaRoot);
+ try (org.apache.arrow.vector.ipc.message.ArrowRecordBatch records =
+ unLoader.getRecordBatch()) {
+ try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+ MessageSerializer.serialize(new
WriteChannel(Channels.newChannel(os)), records);
+ serializedRecord =
+ ArrowRecordBatch.newBuilder()
+ .setRowCount(records.getLength())
+
.setSerializedRecordBatch(ByteString.copyFrom(os.toByteArray()))
+ .build();
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return ReadRowsResponse.newBuilder()
+ .setArrowRecordBatch(serializedRecord)
+ .setRowCount(inputValues.size())
+ .setStats(
+ StreamStats.newBuilder()
+
.setProgress(Progress.newBuilder().setAtResponseStart(0.0).setAtResponseEnd(1.0)))
+ .build();
+ }
+
+ private ReadRowsResponse createAvroTsResponse(
+ Schema avroSchema, TimestampPrecision precision, List<Object>
inputValues) throws Exception {
+ List<GenericRecord> records = new ArrayList<>();
+ for (Object value : inputValues) {
+ GenericRecord record = new Record(avroSchema);
+ record.put("ts", value);
+ records.add(record);
+ }
+
+ GenericDatumWriter<GenericRecord> writer = new
GenericDatumWriter<>(avroSchema);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream,
null);
+ for (GenericRecord record : records) {
+ writer.write(record, binaryEncoder);
+ }
+ binaryEncoder.flush();
+
+ return ReadRowsResponse.newBuilder()
+ .setAvroRows(
+ AvroRows.newBuilder()
+
.setSerializedBinaryRows(ByteString.copyFrom(outputStream.toByteArray()))
+ .setRowCount(records.size()))
+ .setRowCount(records.size())
+ .setStats(
+ StreamStats.newBuilder()
+
.setProgress(Progress.newBuilder().setAtResponseStart(0.0).setAtResponseEnd(1.0)))
+ .build();
+ }
+
+ private void runTimestampTest(
+ DataFormat dataFormat,
+ TimestampPrecision precision,
+ boolean useSchema,
+ List<String> inputTimestamps,
+ List<String> expectedOutputs)
+ throws Exception {
+
+ TimestampPrecision effectivePrecision =
+ (precision != null) ? precision : TimestampPrecision.MICROS;
+
+ fakeDatasetService.createDataset("foo.com:project", "dataset", "", "",
null);
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+ Table table =
+ new
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA_TIMESTAMP);
+ fakeDatasetService.createTable(table);
+
+ org.apache.arrow.vector.types.pojo.Schema arrowSchema =
getArrowSchemaTs(effectivePrecision);
+ Schema avroSchema = getAvroSchemaTs(effectivePrecision);
+ String avroSchemaString = getAvroSchemaStringTs(effectivePrecision);
+
+ List<Object> inputValues = convertInputsForPrecision(inputTimestamps,
effectivePrecision);
+
+ ReadSession readSession = createTsReadSession(dataFormat, arrowSchema,
avroSchemaString);
+
+ List<ReadRowsResponse> readRowsResponses;
+ if (dataFormat == DataFormat.ARROW) {
+ readRowsResponses =
+ Lists.newArrayList(createArrowTsResponse(arrowSchema,
effectivePrecision, inputValues));
+ } else {
+ readRowsResponses =
+ Lists.newArrayList(createAvroTsResponse(avroSchema,
effectivePrecision, inputValues));
+ }
+
+ StorageClient fakeStorageClient = mock(StorageClient.class,
withSettings().serializable());
+
when(fakeStorageClient.createReadSession(any(CreateReadSessionRequest.class)))
+ .thenReturn(readSession);
+ when(fakeStorageClient.readRows(any(ReadRowsRequest.class), eq("")))
+ .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+ TypedRead<TableRow> read =
+ useSchema ? BigQueryIO.readTableRowsWithSchema() :
BigQueryIO.readTableRows();
+
+ read =
+ read.from("foo.com:project:dataset.table")
+ .withMethod(Method.DIRECT_READ)
+ .withFormat(dataFormat)
+ .withTestServices(
+ new FakeBigQueryServices()
+ .withDatasetService(fakeDatasetService)
+ .withStorageClient(fakeStorageClient));
+
+ if (precision != null) {
+ read = read.withDirectReadPicosTimestampPrecision(precision);
+ }
+
+ PCollection<TableRow> output = p.apply(read);
+
+ PAssert.that(output)
+ .satisfies(
+ rows -> {
+ List<TableRow> rowList = Lists.newArrayList(rows);
+ assertEquals(expectedOutputs.size(), rowList.size());
+
+ List<String> actualTimestamps =
+ rowList.stream()
+ .map(r -> (String) r.get("ts"))
+ .sorted()
+ .collect(Collectors.toList());
+
+ List<String> sortedExpected =
+
expectedOutputs.stream().sorted().collect(Collectors.toList());
+
+ assertEquals(sortedExpected, actualTimestamps);
+ return null;
+ });
+
+ p.run();
+ }
+
+ // ===== Avro + readTableRows =====
+
+ @Test
+ public void testReadTableRows_Avro_DefaultPrecision() throws Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ null,
+ false,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15
12:30:45.987654 UTC"));
+ }
+
+ @Test
+ public void testReadTableRows_Avro_MicrosPrecision() throws Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ TimestampPrecision.MICROS,
+ false,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15
12:30:45.987654 UTC"));
+ }
+
+ @Test
+ public void testReadTableRows_Avro_NanosPrecision() throws Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ TimestampPrecision.NANOS,
+ false,
+ Arrays.asList("2024-01-01T00:00:00.123456789Z",
"2024-06-15T12:30:45.987654321Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15
12:30:45.987654321 UTC"));
+ }
+
+ @Test
+ public void testReadTableRows_Avro_PicosPrecision() throws Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ TimestampPrecision.PICOS,
+ false,
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"),
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"));
+ }
+
+ // ===== Avro + readTableRowsWithSchema =====
+
+ @Test
+ public void testReadTableRowsWithSchema_Avro_DefaultPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ null,
+ true,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15
12:30:45.987654 UTC"));
+ }
+
+ @Test
+ public void testReadTableRowsWithSchema_Avro_MicrosPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ TimestampPrecision.MICROS,
+ true,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456 UTC", "2024-06-15
12:30:45.987654 UTC"));
+ }
+
+ @Test
+ public void testReadTableRowsWithSchema_Avro_NanosPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ TimestampPrecision.NANOS,
+ true,
+ Arrays.asList("2024-01-01T00:00:00.123456789Z",
"2024-06-15T12:30:45.987654321Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15
12:30:45.987654321 UTC"));
+ }
+
+ @Test
+ public void testReadTableRowsWithSchema_Avro_PicosPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.AVRO,
+ TimestampPrecision.PICOS,
+ true,
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"),
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"));
+ }
+
+ // ===== Arrow + readTableRows =====
+
+ @Test
+ public void testReadTableRows_Arrow_DefaultPrecision() throws Exception {
+ // Avro records are always converted to beam Row and then to
GenericRecord in
+ // ArrowConversion.java
+ // ArrowConversion.java is a generic utility to convert Arrow records
and it does not take
+ // into account
+ // the BigQuery TableSchema to determine the appropriate beam type.
Historically arrow
+ // microsecond timestamps
+ // are converted to FieldType.DATETIME, which maps to joda Instants,
which only supports up
+ // to millisecond precision
+ // hence precision is lost.
+ runTimestampTest(
+ DataFormat.ARROW,
+ null,
+ false,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987
UTC"));
+ }
+
+ @Test
+ public void testReadTableRows_Arrow_MicrosPrecision() throws Exception {
+ runTimestampTest(
+ DataFormat.ARROW,
+ TimestampPrecision.MICROS,
+ false,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987
UTC"));
+ }
+
+ @Test
+ public void testReadTableRows_Arrow_NanosPrecision() throws Exception {
+ runTimestampTest(
+ DataFormat.ARROW,
+ TimestampPrecision.NANOS,
+ false,
+ Arrays.asList("2024-01-01T00:00:00.123456789Z",
"2024-06-15T12:30:45.987654321Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15
12:30:45.987654321 UTC"));
+ }
+
+ @Test
+ public void testReadTableRows_Arrow_PicosPrecision() throws Exception {
+ runTimestampTest(
+ DataFormat.ARROW,
+ TimestampPrecision.PICOS,
+ false,
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"),
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"));
+ }
+
+ // ===== Arrow + readTableRowsWithSchema =====
+
+ @Test
+ public void testReadTableRowsWithSchema_Arrow_DefaultPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.ARROW,
+ null,
+ true,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987
UTC"));
+ }
+
+ @Test
+ public void testReadTableRowsWithSchema_Arrow_MicrosPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.ARROW,
+ TimestampPrecision.MICROS,
+ true,
+ Arrays.asList("2024-01-01T00:00:00.123456Z",
"2024-06-15T12:30:45.987654Z"),
+ Arrays.asList("2024-01-01 00:00:00.123 UTC", "2024-06-15 12:30:45.987
UTC"));
+ }
+
+ @Test
+ public void testReadTableRowsWithSchema_Arrow_NanosPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.ARROW,
+ TimestampPrecision.NANOS,
+ true,
+ Arrays.asList("2024-01-01T00:00:00.123456789Z",
"2024-06-15T12:30:45.987654321Z"),
+ Arrays.asList("2024-01-01 00:00:00.123456789 UTC", "2024-06-15
12:30:45.987654321 UTC"));
+ }
+
+ @Test
+ public void testReadTableRowsWithSchema_Arrow_PicosPrecision() throws
Exception {
+ runTimestampTest(
+ DataFormat.ARROW,
+ TimestampPrecision.PICOS,
+ true,
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"),
+ Arrays.asList(
+ "2024-01-01 00:00:00.123456789012 UTC", "2024-06-15
12:30:45.987654321098 UTC"));
+ }
+
+ @Test
+ public void
testTableSourceInitialSplit_withDirectReadPicosTimestampPrecisionNanos_Avro()
+ throws Exception {
+ fakeDatasetService.createDataset("foo.com:project", "dataset", "", "",
null);
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+ Table table = new
Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA);
+ fakeDatasetService.createTable(table);
+
+ // Expected request should include AvroSerializationOptions with NANOS
precision
+ CreateReadSessionRequest expectedRequest =
+ CreateReadSessionRequest.newBuilder()
+ .setParent("projects/project-id")
+ .setReadSession(
+ ReadSession.newBuilder()
+
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+ .setDataFormat(DataFormat.AVRO)
+ .setReadOptions(
+ ReadSession.TableReadOptions.newBuilder()
+ .setAvroSerializationOptions(
+
com.google.cloud.bigquery.storage.v1.AvroSerializationOptions
+ .newBuilder()
+ .setPicosTimestampPrecision(
+ com.google.cloud.bigquery.storage.v1
+
.AvroSerializationOptions.PicosTimestampPrecision
+ .TIMESTAMP_PRECISION_NANOS))))
+ .setMaxStreamCount(10)
+ .build();
+
+ ReadSession.Builder builder =
+ ReadSession.newBuilder()
+
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+ .setDataFormat(DataFormat.AVRO);
+ for (int i = 0; i < 10; i++) {
+ builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+ }
+
+ StorageClient fakeStorageClient = mock(StorageClient.class);
+
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+ BigQueryStorageTableSource<TableRow> tableSource =
+ BigQueryStorageTableSource.create(
+ ValueProvider.StaticValueProvider.of(tableRef),
+ DataFormat.AVRO,
+ null, /* selectedFields */
+ null, /* rowRestriction */
+ new TableRowParser(),
+ TableRowJsonCoder.of(),
+ new FakeBigQueryServices()
+ .withDatasetService(fakeDatasetService)
+ .withStorageClient(fakeStorageClient),
+ false, /* projectionPushdownApplied */
+ TimestampPrecision.NANOS);
+
+ List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L,
options);
+ assertEquals(10L, sources.size());
+ }
+
private static org.apache.arrow.vector.types.pojo.Field field(
String name,
boolean nullable,
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
index 5b7b5d47319..de63120c93c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java
@@ -63,6 +63,8 @@ public class BigQueryIOTranslationTest {
READ_TRANSFORM_SCHEMA_MAPPING.put("getQueryTempProject",
"query_temp_project");
READ_TRANSFORM_SCHEMA_MAPPING.put("getMethod", "method");
READ_TRANSFORM_SCHEMA_MAPPING.put("getFormat", "format");
+ READ_TRANSFORM_SCHEMA_MAPPING.put(
+ "getDirectReadPicosTimestampPrecision",
"direct_read_picos_timestamp_precision");
READ_TRANSFORM_SCHEMA_MAPPING.put("getSelectedFields", "selected_fields");
READ_TRANSFORM_SCHEMA_MAPPING.put("getRowRestriction", "row_restriction");
READ_TRANSFORM_SCHEMA_MAPPING.put("getCoder", "coder");
@@ -323,4 +325,24 @@ public class BigQueryIOTranslationTest {
.contains(fieldName));
});
}
+
+ @Test
+ public void
testReCreateReadTransformFromRowWithDirectReadPicosTimestampPrecision() {
+ BigQueryIO.TypedRead<TableRow> readTransform =
+ BigQueryIO.readTableRows()
+ .from("dummyproject:dummydataset.dummytable")
+ .withMethod(TypedRead.Method.DIRECT_READ)
+ .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS);
+
+ BigQueryIOTranslation.BigQueryIOReadTranslator translator =
+ new BigQueryIOTranslation.BigQueryIOReadTranslator();
+ Row row = translator.toConfigRow(readTransform);
+
+ BigQueryIO.TypedRead<TableRow> readTransformFromRow =
+ (BigQueryIO.TypedRead<TableRow>)
+ translator.fromConfigRow(row, PipelineOptionsFactory.create());
+
+ assertEquals(
+ TimestampPrecision.PICOS,
readTransformFromRow.getDirectReadPicosTimestampPrecision());
+ }
}