gemini-code-assist[bot] commented on code in PR #37079:
URL: https://github.com/apache/beam/pull/37079#discussion_r2620853608
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -199,4 +205,57 @@ public List<BigQueryStorageStreamSource<T>> split(
public BoundedReader<T> createReader(PipelineOptions options) throws
IOException {
throw new UnsupportedOperationException("BigQuery storage source must be
split before reading");
}
+
+ private void setPicosTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat
dataFormat) {
+ if (picosTimestampPrecision == null) {
+ return;
+ }
+
+ if (dataFormat == DataFormat.ARROW) {
+ setArrowTimestampPrecision(tableReadOptionsBuilder);
+ } else if (dataFormat == DataFormat.AVRO) {
+ setAvroTimestampPrecision(tableReadOptionsBuilder);
+ }
+ }
+
+ private void setArrowTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder) {
+ ArrowSerializationOptions.PicosTimestampPrecision precision;
+ switch (checkNotNull(picosTimestampPrecision)) {
+ case MICROS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
+ break;
+ case NANOS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
+ break;
+ case PICOS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
+ break;
+ default:
+ return;
+ }
+ tableReadOptionsBuilder.setArrowSerializationOptions(
+
ArrowSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
+ }
+
+ private void setAvroTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder) {
+ AvroSerializationOptions.PicosTimestampPrecision precision;
+ switch (checkNotNull(picosTimestampPrecision)) {
+ case MICROS:
+ precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
+ break;
+ case NANOS:
+ precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
+ break;
+ case PICOS:
+ precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
+ break;
+ default:
+ return;
+ }
Review Comment:

Similar to `setArrowTimestampPrecision`, this method can be improved by
removing the redundant `checkNotNull` and making the `switch` statement more
robust against future changes to the `TimestampPrecision` enum by handling all
cases explicitly or throwing an exception for unhandled ones.
```java
switch (picosTimestampPrecision) {
case MICROS:
precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
break;
case NANOS:
precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
break;
case PICOS:
precision =
AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
break;
case MILLIS:
// The Storage Read API does not support millisecond precision for
TIMESTAMP(12) columns.
return;
default:
throw new IllegalArgumentException(
"Unsupported timestamp precision for Storage Read API: " +
picosTimestampPrecision);
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java:
##########
@@ -195,6 +196,10 @@ public Row toConfigRow(TypedRead<?> transform) {
if (transform.getUseAvroLogicalTypes() != null) {
fieldValues.put("use_avro_logical_types",
transform.getUseAvroLogicalTypes());
}
+ if (transform.getDirectReadPicosTimestampPrecision() != null) {
+ fieldValues.put(
+ "timestamp_precision",
toByteArray(transform.getDirectReadPicosTimestampPrecision()));
Review Comment:

There's a mismatch between the field name used here for serialization
(`timestamp_precision`) and the name defined in the schema and used for
deserialization (`direct_read_picos_timestamp_precision`). This will cause
portability issues as the serialized value won't be correctly read back. Please
use the correct field name to ensure the transform can be correctly serialized
and deserialized across different contexts.
```suggestion
fieldValues.put(
"direct_read_picos_timestamp_precision",
toByteArray(transform.getDirectReadPicosTimestampPrecision()));
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -2381,6 +2444,587 @@ public void testReadFromBigQueryAvroObjectsMutation()
throws Exception {
assertEquals(new Utf8("A"), rowA.get("name"));
}
+ @Test
+ public void testTimestampPrecisionDefaultValue() {
+ BigQueryIO.TypedRead<TableRow> typedRead =
+ BigQueryIO.read(new TableRowParser())
+ .withCoder(TableRowJsonCoder.of())
+ .withMethod(Method.DIRECT_READ)
+ .from("foo.com:project:dataset.table");
+
+ assertNull(typedRead.getDirectReadPicosTimestampPrecision());
+ }
+
+ @Test
+ public void testwithDirectReadPicosTimestampPrecisionNanos() {
+ BigQueryIO.TypedRead<TableRow> typedRead =
+ BigQueryIO.read(new TableRowParser())
+ .withCoder(TableRowJsonCoder.of())
+ .withMethod(Method.DIRECT_READ)
+ .from("foo.com:project:dataset.table")
+ .withDirectReadPicosTimestampPrecision(TimestampPrecision.NANOS);
+
+ assertEquals(TimestampPrecision.NANOS,
typedRead.getDirectReadPicosTimestampPrecision());
+ }
+
+ @Test
+ public void testwithDirectReadPicosTimestampPrecisionPicos() {
+ BigQueryIO.TypedRead<TableRow> typedRead =
+ BigQueryIO.read(new TableRowParser())
+ .withCoder(TableRowJsonCoder.of())
+ .withMethod(Method.DIRECT_READ)
+ .from("foo.com:project:dataset.table")
+ .withDirectReadPicosTimestampPrecision(TimestampPrecision.PICOS);
+
+ assertEquals(TimestampPrecision.PICOS,
typedRead.getDirectReadPicosTimestampPrecision());
+ }
+
+ @Test
+ public void
testTableSourceInitialSplit_withDirectReadPicosTimestampPrecisionNanos_Arrow()
+ throws Exception {
+ fakeDatasetService.createDataset("foo.com:project", "dataset", "", "",
null);
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+ Table table = new
Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA);
+ fakeDatasetService.createTable(table);
+
+ CreateReadSessionRequest expectedRequest =
+ CreateReadSessionRequest.newBuilder()
+ .setParent("projects/project-id")
+ .setReadSession(
+ ReadSession.newBuilder()
+
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+ .setDataFormat(DataFormat.ARROW)
+ .setReadOptions(
+ ReadSession.TableReadOptions.newBuilder()
+ .setArrowSerializationOptions(
+
com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions
+ .newBuilder()
+ .setPicosTimestampPrecision(
+ com.google.cloud.bigquery.storage.v1
+
.ArrowSerializationOptions.PicosTimestampPrecision
+ .TIMESTAMP_PRECISION_NANOS))))
+ .setMaxStreamCount(10)
+ .build();
+
+ ReadSession.Builder builder =
+ ReadSession.newBuilder()
+ .setArrowSchema(
+
ArrowSchema.newBuilder().setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)))
+ .setDataFormat(DataFormat.ARROW);
+ for (int i = 0; i < 10; i++) {
+ builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+ }
+
+ StorageClient fakeStorageClient = mock(StorageClient.class);
+
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+ BigQueryStorageTableSource<TableRow> tableSource =
+ BigQueryStorageTableSource.create(
+ ValueProvider.StaticValueProvider.of(tableRef),
+ DataFormat.ARROW,
+ null, /* selectedFields */
+ null, /* rowRestriction */
+ new TableRowParser(),
+ TableRowJsonCoder.of(),
+ new FakeBigQueryServices()
+ .withDatasetService(fakeDatasetService)
+ .withStorageClient(fakeStorageClient),
+ false, /* projectionPushdownApplied */
+ TimestampPrecision.NANOS);
+
+ List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L,
options);
+ assertEquals(10L, sources.size());
+ }
+
+ private org.apache.arrow.vector.types.pojo.Schema
getArrowSchemaTs(TimestampPrecision precision) {
+ switch (precision) {
+ case NANOS:
+ return ARROW_SCHEMA_TS_NANOS;
+ case PICOS:
+ return ARROW_SCHEMA_TS_PICOS;
+ case MICROS:
+ default:
+ return ARROW_SCHEMA_TS_MICROS;
+ }
Review Comment:

The `default` case here will treat any unhandled `TimestampPrecision` enum
value as `MICROS`. This could lead to subtle bugs if new precisions are added.
It would be safer to make the `switch` statement exhaustive or throw an
exception in the `default` case to ensure all enum values are explicitly
handled. This applies to other similar helper methods in this test file as well
(e.g., `getAvroSchemaTs`, `createArrowTsResponse`).
```java
switch (precision) {
case NANOS:
return ARROW_SCHEMA_TS_NANOS;
case PICOS:
return ARROW_SCHEMA_TS_PICOS;
case MICROS:
return ARROW_SCHEMA_TS_MICROS;
default:
throw new IllegalArgumentException("Unsupported timestamp precision:
" + precision);
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -199,4 +205,57 @@ public List<BigQueryStorageStreamSource<T>> split(
public BoundedReader<T> createReader(PipelineOptions options) throws
IOException {
throw new UnsupportedOperationException("BigQuery storage source must be
split before reading");
}
+
+ private void setPicosTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat
dataFormat) {
+ if (picosTimestampPrecision == null) {
+ return;
+ }
+
+ if (dataFormat == DataFormat.ARROW) {
+ setArrowTimestampPrecision(tableReadOptionsBuilder);
+ } else if (dataFormat == DataFormat.AVRO) {
+ setAvroTimestampPrecision(tableReadOptionsBuilder);
+ }
+ }
+
+ private void setArrowTimestampPrecision(
+ ReadSession.TableReadOptions.Builder tableReadOptionsBuilder) {
+ ArrowSerializationOptions.PicosTimestampPrecision precision;
+ switch (checkNotNull(picosTimestampPrecision)) {
+ case MICROS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
+ break;
+ case NANOS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
+ break;
+ case PICOS:
+ precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
+ break;
+ default:
+ return;
+ }
Review Comment:

This method can be slightly improved for clarity and robustness:
1. The `checkNotNull(picosTimestampPrecision)` is redundant because
`picosTimestampPrecision` is checked for null in the calling method
`setPicosTimestampPrecision`.
2. The `switch` statement is not exhaustive for the `TimestampPrecision`
enum (it's missing `MILLIS`). The `default` case currently returns silently,
which could hide bugs if new enum values are added in the future. It would be
safer to explicitly handle `MILLIS` (if it's intentionally unsupported) and
have a `default` case that throws an exception for any other unhandled values.
```java
switch (picosTimestampPrecision) {
case MICROS:
precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
break;
case NANOS:
precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
break;
case PICOS:
precision =
ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
break;
case MILLIS:
// The Storage Read API does not support millisecond precision for
TIMESTAMP(12) columns.
return;
default:
throw new IllegalArgumentException(
"Unsupported timestamp precision for Storage Read API: " +
picosTimestampPrecision);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]