This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ce76f81fbbc Add timestamp-nanos avro logical type support in bigquery
avro utils. (#36892)
ce76f81fbbc is described below
commit ce76f81fbbc7f58d20c44a1f36121ba591bed866
Author: claudevdm <[email protected]>
AuthorDate: Wed Dec 3 14:39:35 2025 -0500
Add timestamp-nanos avro logical type support in bigquery avro utils.
(#36892)
* Add timestamp-nanos avro logical type support in bigquery utils.
* Lint.
* Avoid overflows for millis/micros.
* Remove println
* Comments.
* Comments
* Comments.
---------
Co-authored-by: Claude <[email protected]>
---
.../beam/sdk/schemas/logicaltypes/Timestamp.java | 1 -
.../extensions/avro/schemas/utils/AvroUtils.java | 46 ++++++
.../avro/schemas/utils/AvroUtilsTest.java | 83 ++++++++++
.../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 106 +++++++++----
.../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 170 ++++++++++++++++++---
5 files changed, 357 insertions(+), 49 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
index 058331a44cf..87e47f5961e 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java
@@ -157,7 +157,6 @@ public class Timestamp implements
Schema.LogicalType<Instant, Row> {
maxSubseconds,
precision,
subseconds);
-
return Instant.ofEpochSecond(
checkArgumentNotNull(
base.getInt64(0), "While trying to convert to Instant: Row missing
seconds field"),
diff --git
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
index 38621571ca1..882e46208a9 100644
---
a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
+++
b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java
@@ -27,6 +27,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -81,6 +82,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
@@ -137,6 +139,7 @@ import org.joda.time.ReadableInstant;
* LogicalTypes.TimestampMillis <-----> DATETIME
* LogicalTypes.TimestampMicros ------> Long
* LogicalTypes.TimestampMicros <------
LogicalType(urn="beam:logical_type:micros_instant:v1")
+ * LogicalTypes.TimestampNanos <------> LogicalType(TIMESTAMP(9))
* LogicalTypes.Decimal <-----> DECIMAL
* </pre>
*
@@ -164,6 +167,8 @@ public class AvroUtils {
private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS;
+ private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
+
static {
GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData();
addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS);
@@ -1027,6 +1032,11 @@ public class AvroUtils {
fieldType = FieldType.DATETIME;
}
}
+ // TODO: Remove once Avro 1.12+ has timestamp-nanos
+ if (fieldType == null
+ &&
TIMESTAMP_NANOS_LOGICAL_TYPE.equals(avroSchema.getProp("logicalType"))) {
+ fieldType = FieldType.logicalType(Timestamp.NANOS);
+ }
if (fieldType == null) {
switch (type.type.getType()) {
@@ -1186,6 +1196,14 @@ public class AvroUtils {
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
baseType =
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
+ } else if (Timestamp.IDENTIFIER.equals(identifier)) {
+ int precision = checkNotNull(logicalType.getArgument());
+ if (precision != 9) {
+ throw new RuntimeException(
+ "Timestamp logical type precision not supported:" + precision);
+ }
+ baseType = org.apache.avro.Schema.create(Type.LONG);
+ baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE);
} else {
throw new RuntimeException(
"Unhandled logical type " +
checkNotNull(fieldType.getLogicalType()).getIdentifier());
@@ -1340,6 +1358,16 @@ public class AvroUtils {
java.time.Instant instant = (java.time.Instant) value;
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+ TimeUnit.NANOSECONDS.toMicros(instant.getNano());
+ } else if (Timestamp.IDENTIFIER.equals(identifier)) {
+ java.time.Instant instant = (java.time.Instant) value;
+ // Use BigInteger to work around long overflows so that epochNanos =
Long.MIN_VALUE can be
+ // supported. Instant always stores nanos as positive adjustment so
the math will silently
+ // overflow with regular int64.
+ BigInteger epochSeconds =
BigInteger.valueOf(instant.getEpochSecond());
+ BigInteger nanosOfSecond = BigInteger.valueOf(instant.getNano());
+ BigInteger epochNanos =
+
epochSeconds.multiply(BigInteger.valueOf(1_000_000_000L)).add(nanosOfSecond);
+ return epochNanos.longValueExact();
} else {
throw new RuntimeException("Unhandled logical type " + identifier);
}
@@ -1387,6 +1415,24 @@ public class AvroUtils {
@Nonnull FieldType fieldType,
@Nonnull GenericData genericData) {
TypeWithNullability type = new TypeWithNullability(avroSchema);
+
+ // TODO: Remove this workaround once Avro is upgraded to 1.12+ where
timestamp-nanos
+ if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType")))
{
+ if (type.type.getType() == Type.LONG) {
+ Long nanos = (Long) value;
+ // Check if Beam expects Timestamp logical type
+ if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE
+ &&
org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals(
+ fieldType.getLogicalType().getIdentifier())) {
+ long seconds = Math.floorDiv(nanos, 1_000_000_000L);
+ long nanoAdjustment = Math.floorMod(nanos, 1_000_000_000L);
+ return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment);
+ } else {
+ return nanos;
+ }
+ }
+ }
+
LogicalType logicalType = LogicalTypes.fromSchema(type.type);
if (logicalType == null) {
return null;
diff --git
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
index 41a43ed850b..d087ed0a20b 100644
---
a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
+++
b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -549,6 +550,88 @@ public class AvroUtilsTest {
assertEquals(getAvroSchema(), avroSchema);
}
+ @Test
+ public void testBeamTimestampNanosLogicalTypeToAvroSchema() {
+ Schema beamSchema =
+ Schema.builder().addLogicalTypeField("timestampNanos",
Timestamp.NANOS).build();
+
+ // Expected Avro schema with timestamp-nanos
+ String expectedJson =
+ "{\"type\": \"record\", \"name\": \"topLevelRecord\", "
+ + "\"fields\": [{\"name\": \"timestampNanos\", "
+ + "\"type\": {\"type\": \"long\", \"logicalType\":
\"timestamp-nanos\"}}]}";
+
+ org.apache.avro.Schema expectedAvroSchema =
+ new org.apache.avro.Schema.Parser().parse(expectedJson);
+
+ assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema));
+ }
+
+ @Test
+ public void testBeamTimestampNanosToGenericRecord() {
+ Schema beamSchema =
+ Schema.builder().addLogicalTypeField("timestampNanos",
Timestamp.NANOS).build();
+
+ java.time.Instant instant =
java.time.Instant.parse("2000-01-01T01:02:03.123456789Z");
+ Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();
+
+ // Expected nanos since epoch
+ long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) +
instant.getNano();
+
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+ GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema);
+
+ assertEquals(expectedNanos, avroRecord.get("timestampNanos"));
+ }
+
+ @Test
+ public void testTimestampNanosRoundTrip() {
+ Schema beamSchema =
+ Schema.builder().addLogicalTypeField("timestampNanos",
Timestamp.NANOS).build();
+
+ // Test various nanosecond precisions
+ java.time.Instant[] testInstants = {
+ java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano
+ java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos
+ java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos
+ java.time.Instant.ofEpochSecond(0L, Long.MAX_VALUE), // max supported
+ java.time.Instant.parse("1677-09-21T00:12:43.145224192Z"), // min
supported by an int64
+ };
+
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+
+ for (java.time.Instant instant : testInstants) {
+ Row originalRow = Row.withSchema(beamSchema).addValue(instant).build();
+ GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow,
avroSchema);
+ Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema);
+
+ assertEquals(originalRow, roundTripRow);
+ java.time.Instant roundTripInstant =
+ (java.time.Instant) roundTripRow.getValue("timestampNanos");
+ assertEquals(instant, roundTripInstant);
+ }
+ }
+
+ @Test
+ public void testTimestampNanosAvroSchemaToBeamSchema() {
+ List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
+ fields.add(
+ new org.apache.avro.Schema.Field(
+ "timestampNanos",
+ new org.apache.avro.Schema.Parser()
+ .parse("{\"type\": \"long\", \"logicalType\":
\"timestamp-nanos\"}"),
+ "",
+ (Object) null));
+ org.apache.avro.Schema avroSchema =
+ org.apache.avro.Schema.createRecord("test", null, null, false, fields);
+
+ Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);
+
+ Schema expected =
+ Schema.builder().addLogicalTypeField("timestampNanos",
Timestamp.NANOS).build();
+ assertEquals(expected, beamSchema);
+ }
+
@Test
public void testAvroSchemaFromBeamSchemaCanBeParsed() {
org.apache.avro.Schema convertedSchema =
AvroUtils.toAvroSchema(getBeamSchema());
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index c169a0571b7..b5243a8110b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -50,8 +50,6 @@ import org.apache.avro.generic.GenericRecord;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
/** A set of utilities for working with Avro files. */
class BigQueryAvroUtils {
@@ -60,7 +58,7 @@ class BigQueryAvroUtils {
Optional.ofNullable(Schema.class.getPackage())
.map(Package::getImplementationVersion)
.orElse("");
-
+ private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
// org.apache.avro.LogicalType
static class DateTimeLogicalType extends LogicalType {
public DateTimeLogicalType() {
@@ -161,36 +159,73 @@ class BigQueryAvroUtils {
* Formats BigQuery seconds-since-epoch into String matching JSON export.
Thread-safe and
* immutable.
*/
- private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
- DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
-
- @VisibleForTesting
- static String formatTimestamp(Long timestampMicro) {
- String dateTime = formatDatetime(timestampMicro);
- return dateTime + " UTC";
+ private static final java.time.format.DateTimeFormatter DATE_TIME_FORMATTER =
+ java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
+ .withZone(java.time.ZoneOffset.UTC);
+
+ /** Enum to define the precision of a timestamp since the epoch. */
+ enum TimestampPrecision {
+ MILLISECONDS,
+ MICROSECONDS,
+ NANOSECONDS;
+
+ /** Converts an epoch value of this precision to an Instant. */
+ java.time.Instant toInstant(long epochValue) {
+ switch (this) {
+ case MILLISECONDS:
+ return java.time.Instant.ofEpochMilli(epochValue);
+ case MICROSECONDS:
+ {
+ long seconds = Math.floorDiv(epochValue, 1_000_000L);
+ long microsOfSecond = Math.floorMod(epochValue, 1_000_000L);
+ return java.time.Instant.ofEpochSecond(seconds, microsOfSecond *
1_000L);
+ }
+ case NANOSECONDS:
+ {
+ long seconds = Math.floorDiv(epochValue, 1_000_000_000L);
+ long nanosOfSecond = Math.floorMod(epochValue, 1_000_000_000L);
+ return java.time.Instant.ofEpochSecond(seconds, nanosOfSecond);
+ }
+ default:
+ throw new IllegalStateException("Unknown precision: " + this);
+ }
+ }
}
+ /**
+ * Formats an Instant with minimal fractional second precision. Shows 0, 3,
6, or 9 decimal places
+ * based on actual precision of the value.
+ */
@VisibleForTesting
- static String formatDatetime(Long timestampMicro) {
- // timestampMicro is in "microseconds since epoch" format,
- // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
- // Separate into seconds and microseconds.
- long timestampSec = timestampMicro / 1_000_000;
- long micros = timestampMicro % 1_000_000;
- if (micros < 0) {
- micros += 1_000_000;
- timestampSec -= 1;
- }
- String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
- if (micros == 0) {
- return dayAndTime;
- } else if (micros % 1000 == 0) {
- return String.format("%s.%03d", dayAndTime, micros / 1000);
+ @SuppressWarnings("JavaInstantGetSecondsGetNano")
+ static String formatDatetime(java.time.Instant instant) {
+ String dateTime = DATE_TIME_FORMATTER.format(instant);
+ int nanos = instant.getNano();
+
+ if (nanos == 0) {
+ return dateTime;
+ } else if (nanos % 1_000_000 == 0) {
+ return dateTime + String.format(".%03d", nanos / 1_000_000);
+ } else if (nanos % 1_000 == 0) {
+ return dateTime + String.format(".%06d", nanos / 1_000);
} else {
- return String.format("%s.%06d", dayAndTime, micros);
+ return dateTime + String.format(".%09d", nanos);
}
}
+ @VisibleForTesting
+ static String formatDatetime(long epochValue, TimestampPrecision precision) {
+ return formatDatetime(precision.toInstant(epochValue));
+ }
+
+ static String formatTimestamp(java.time.Instant instant) {
+ return formatDatetime(instant) + " UTC";
+ }
+
+ static String formatTimestamp(long epochValue, TimestampPrecision precision)
{
+ return formatTimestamp(precision.toInstant(epochValue));
+ }
+
/**
* This method formats a BigQuery DATE value into a String matching the
format used by JSON
* export. Date records are stored in "days since epoch" format, and
BigQuery uses the proleptic
@@ -335,7 +370,6 @@ class BigQueryAvroUtils {
// REQUIRED fields are represented as the corresponding Avro types. For
example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null", name);
-
Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
switch (type) {
@@ -364,21 +398,26 @@ class BigQueryAvroUtils {
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// Write only: SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
- return formatTimestamp((Long) v * 1000L);
+ return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
// SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
- return formatTimestamp((Long) v);
+ return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS);
+ // TODO: Use LogicalTypes.TimestampNanos once avro version is
updated.
+ } else if
(TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType"))) {
+ // SQL type TIMESTAMP
+ // ideally Instant but TableRowJsonCoder encodes as String
+ return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS);
} else if (!(VERSION_AVRO.startsWith("1.8") ||
VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMillis) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
- return formatDatetime(((Long) v) * 1000);
+ return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS);
} else if (!(VERSION_AVRO.startsWith("1.8") ||
VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMicros) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
- return formatDatetime((Long) v);
+ return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS);
} else {
// SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ
JSON export that uses
@@ -602,6 +641,11 @@ class BigQueryAvroUtils {
return fieldSchema.setType("INTEGER");
}
case LONG:
+ // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
+ if (useAvroLogicalTypes
+ &&
(TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
+ return fieldSchema.setType("TIMESTAMP");
+ }
if (logicalType instanceof LogicalTypes.TimeMicros) {
return fieldSchema.setType("TIME");
} else if (!(VERSION_AVRO.startsWith("1.8") ||
VERSION_AVRO.startsWith("1.9"))
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 9b752055d01..e95e1546596 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -280,6 +280,30 @@ public class BigQueryAvroUtilsTest {
assertEquals(expected, row.clone());
}
+ {
+ // timestamp-nanos
+ // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
+ String timestampNanosJson = "{\"type\": \"long\", \"logicalType\":
\"timestamp-nanos\"}";
+ Schema timestampType = new Schema.Parser().parse(timestampNanosJson);
+
+ // 2000-01-01 01:02:03.123456789 UTC
+ LocalDate date = LocalDate.of(2000, 1, 1);
+ LocalTime time = LocalTime.of(1, 2, 3, 123456789);
+ LocalDateTime ts = LocalDateTime.of(date, time);
+ long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond();
+ int nanos = ts.toInstant(ZoneOffset.UTC).getNano();
+ long totalNanos = seconds * 1_000_000_000L + nanos;
+ GenericRecord record =
+ new GenericRecordBuilder(avroSchema(f ->
f.type(timestampType).noDefault()))
+ .set("value", totalNanos)
+ .build();
+ TableRow expected = new TableRow().set("value", "2000-01-01
01:02:03.123456789 UTC");
+ TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record);
+
+ assertEquals(expected, row);
+ assertEquals(expected, row.clone());
+ }
+
{
// timestamp-micros
LogicalType lt = LogicalTypes.timestampMillis();
@@ -923,6 +947,19 @@ public class BigQueryAvroUtilsTest {
assertEquals(expectedRaw,
BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false));
}
+ {
+ // timestamp-nanos
+ // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
+ String timestampNanosJson = "{\"type\": \"long\", \"logicalType\":
\"timestamp-nanos\"}";
+ Schema timestampType = new Schema.Parser().parse(timestampNanosJson);
+ Schema avroSchema = avroSchema(f -> f.type(timestampType).noDefault());
+ TableSchema expected = tableSchema(f ->
f.setType("TIMESTAMP").setMode("REQUIRED"));
+ TableSchema expectedRaw = tableSchema(f ->
f.setType("INTEGER").setMode("REQUIRED"));
+
+ assertEquals(expected,
BigQueryAvroUtils.fromGenericAvroSchema(avroSchema));
+ assertEquals(expectedRaw,
BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false));
+ }
+
{
// string prop: sqlType=GEOGRAPHY
Schema avroSchema =
@@ -978,39 +1015,138 @@ public class BigQueryAvroUtilsTest {
}
@Test
- public void testFormatTimestamp() {
- long micros = 1452062291123456L;
+ public void testFormatTimestampInputMillis() {
+ // Min: Earliest timestamp supported by BQ
+ //
https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
+ long minMillis = -62135596800000L;
+ String expectedMin = "0001-01-01 00:00:00";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(
+ minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+ equalTo(expectedMin));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+ equalTo(expectedMin + " UTC"));
+
+ // Existing: Regular timestamp
+ long millis = 1452062291123L;
+ String expected = "2016-01-06 06:38:11.123";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(millis,
BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+ equalTo(expected));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ millis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+ equalTo(expected + " UTC"));
+
+ // Max: Latest timestamp supported by BQ
+ long maxMillis = 253402300799999L;
+ String expectedMax = "9999-12-31 23:59:59.999";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(
+ maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+ equalTo(expectedMax));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS),
+ equalTo(expectedMax + " UTC"));
+ }
+
+ @Test
+ public void testFormatTimestampInputMicros() {
+ long minMicro = -62_135_596_800_000_000L;
+ String expectedMin = "0001-01-01 00:00:00";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(
+ minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+ equalTo(expectedMin));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+ equalTo(expectedMin + " UTC"));
+
+ long micros = 1452_062_291_123_456L;
String expected = "2016-01-06 06:38:11.123456";
- assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
- assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + "
UTC"));
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(micros,
BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+ equalTo(expected));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+ equalTo(expected + " UTC"));
+
+ // Max: Latest timestamp supported by BQ
+ long maxMicros = 253_402_300_799_999_000L;
+ String expectedMax = "9999-12-31 23:59:59.999";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(
+ maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+ equalTo(expectedMax));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS),
+ equalTo(expectedMax + " UTC"));
}
@Test
- public void testFormatTimestampMillis() {
- long millis = 1452062291123L;
- long micros = millis * 1000L;
- String expected = "2016-01-06 06:38:11.123";
- assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
- assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + "
UTC"));
+ public void testFormatTimestampInputNanos() {
+ long minNanos = Long.MIN_VALUE; // -9223372036854775808L
+ String expectedMin = "1677-09-21 00:12:43.145224192";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(
+ minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+ equalTo(expectedMin));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+ equalTo(expectedMin + " UTC"));
+
+ long nanos = 1452062291123456789L;
+ String expected = "2016-01-06 06:38:11.123456789";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(nanos,
BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+ equalTo(expected));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(nanos,
BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+ equalTo(expected + " UTC"));
+
+ long maxNanos = Long.MAX_VALUE; // 9223372036854775807L
+ String expectedMax = "2262-04-11 23:47:16.854775807";
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(
+ maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+ equalTo(expectedMax));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(
+ maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS),
+ equalTo(expectedMax + " UTC"));
}
@Test
- public void testFormatTimestampSeconds() {
+ public void testFormatTimestampInputMicrosOutputSecondsFormat() {
+ BigQueryAvroUtils.TimestampPrecision precision =
+ BigQueryAvroUtils.TimestampPrecision.MICROSECONDS;
long seconds = 1452062291L;
long micros = seconds * 1000L * 1000L;
String expected = "2016-01-06 06:38:11";
- assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected));
- assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + "
UTC"));
+ assertThat(BigQueryAvroUtils.formatDatetime(micros, precision),
equalTo(expected));
+ assertThat(BigQueryAvroUtils.formatTimestamp(micros, precision),
equalTo(expected + " UTC"));
}
@Test
public void testFormatTimestampNegative() {
- assertThat(BigQueryAvroUtils.formatDatetime(-1L), equalTo("1969-12-31
23:59:59.999999"));
- assertThat(BigQueryAvroUtils.formatDatetime(-100_000L),
equalTo("1969-12-31 23:59:59.900"));
- assertThat(BigQueryAvroUtils.formatDatetime(-1_000_000L),
equalTo("1969-12-31 23:59:59"));
+ BigQueryAvroUtils.TimestampPrecision precision =
+ BigQueryAvroUtils.TimestampPrecision.MICROSECONDS;
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(-1L, precision), equalTo("1969-12-31
23:59:59.999999"));
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(-100_000L, precision),
equalTo("1969-12-31 23:59:59.900"));
+ assertThat(
+ BigQueryAvroUtils.formatDatetime(-1_000_000L, precision),
equalTo("1969-12-31 23:59:59"));
// No leap seconds before 1972. 477 leap years from 1 through 1969.
assertThat(
- BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 *
1_000_000),
+ BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 *
1_000_000, precision),
equalTo("0001-01-01 00:00:00"));
}