Abacn commented on code in PR #37091: URL: https://github.com/apache/beam/pull/37091#discussion_r2641011345
########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java: ########## @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import java.security.SecureRandom; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration tests for BigQuery TIMESTAMP with picosecond precision. + * + * <p>Tests write data via Storage Write API and read back using different precision settings. Each + * test clearly shows: WRITE DATA → READ SETTINGS → EXPECTED OUTPUT. + */ +@RunWith(JUnit4.class) +public class BigQueryTimestampPicosIT { Review Comment: consider exclude these tests to (already long running) Dataflow suites here: https://github.com/apache/beam/blob/c511260907188aef46e9088ab4ce5eb5ee5b0adf/runners/google-cloud-dataflow-java/build.gradle#L645 https://github.com/apache/beam/blob/c511260907188aef46e9088ab4ce5eb5ee5b0adf/runners/google-cloud-dataflow-java/build.gradle#L700 like other BigQuery IT does, if running on direct runner (Java GCP IO PreCommit) is sufficient. ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java: ########## @@ -1313,6 +1363,34 @@ public static ByteString mergeNewFields( null, null); } + } else if (schemaInformation.getType() == TableFieldSchema.Type.TIMESTAMP + && schemaInformation.getTimestampPrecision() == PICOSECOND_PRECISION) { + + long seconds; + long picoseconds; + + if (value instanceof String) { + BigQueryUtils.TimestampPicos parsed = + BigQueryUtils.parseTimestampPicosFromString((String) value); + seconds = parsed.seconds; + picoseconds = parsed.picoseconds; + + } else if (value instanceof Instant) { + Instant timestamp = (Instant) value; + seconds = timestamp.getEpochSecond(); + picoseconds = timestamp.getNano() * 1000L; + } else { Review Comment: do we need to handle org.joda.time.Instant like here? https://github.com/apache/beam/blob/0f89aa885de6e5df4fd1655e66528283504b520f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L316 ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java: ########## @@ -199,6 +216,8 @@ public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> { static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("0.0###############", DecimalFormatSymbols.getInstance(Locale.ROOT)); + private static final long PICOSECOND_PRECISION = 12L; Review Comment: sounds a valid (minor) comment. We can make it class private (not public) and use it inside BigQueryIO ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java: ########## @@ -380,6 +382,67 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { return ret; } + /** + * Represents a timestamp with picosecond precision, split into seconds and picoseconds + * components. + */ + public static class TimestampPicos { + final long seconds; + final long picoseconds; + + TimestampPicos(long seconds, long picoseconds) { + this.seconds = seconds; + this.picoseconds = picoseconds; + } + } + + /** + * Parses a timestamp string into seconds and picoseconds components. + * + * <p>Handles two formats: + * + * <ul> + * <li>ISO format with exactly 12 fractional digits ending in Z (picosecond precision): e.g., + * "2024-01-15T10:30:45.123456789012Z" + * <li>UTC format with 0-9 fractional digits ending in "UTC" (up to nanosecond precision): e.g., + * "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45 UTC" + * </ul> + */ + public static TimestampPicos parseTimestampPicosFromString(String timestampString) { Review Comment: nit: consider put it inside as a member static method TimestampPicos.fromString(...) ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java: ########## @@ -1060,6 +1091,25 @@ private static void fieldDescriptorFromTableField( fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName()); break; + case TIMESTAMP: + if (fieldSchema.getTimestampPrecision().getValue() == PICOSECOND_PRECISION) { + boolean typeAlreadyExists = + descriptorBuilder.getNestedTypeList().stream() + .anyMatch(d -> TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName())); + + if (!typeAlreadyExists) { + descriptorBuilder.addNestedType(TIMESTAMP_PICOS_DESCRIPTOR_PROTO); + } + fieldDescriptorBuilder = + fieldDescriptorBuilder + .setType(Type.TYPE_MESSAGE) + .setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName()); + } else { + // Microsecond precision - use simple INT64 Review Comment: do we need to handle nano second precision here (fieldSchema.getTimestampPrecision().getValue() == 9) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
