This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ac5640a [FLINK-12253][table-common] Add a TIMESTAMP WITH LOCAL TIME ZONE type ac5640a is described below commit ac5640aa3309b1f6c3845ab2747ef26997dc49ab Author: Timo Walther <twal...@apache.org> AuthorDate: Thu May 2 10:12:39 2019 +0200 [FLINK-12253][table-common] Add a TIMESTAMP WITH LOCAL TIME ZONE type --- .../types/logical/LocalZonedTimestampType.java | 176 +++++++++++++++++++++ .../table/types/logical/LogicalTypeVisitor.java | 2 + .../apache/flink/table/types/LogicalTypesTest.java | 14 ++ 3 files changed, 192 insertions(+) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java new file mode 100644 index 0000000..43934d5 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a timestamp WITH LOCAL time zone consisting of {@code year-month-day hour:minute:second[.fractional] zone} + * with up to nanosecond precision and values ranging from {@code 0000-01-01 00:00:00.000000000 +14:59} to + * {@code 9999-12-31 23:59:59.999999999 -14:59}. Leap seconds (23:59:60 and 23:59:61) are not + * supported as the semantics are closer to {@link java.time.OffsetDateTime}. + * + * <p>The serialized string representation is {@code TIMESTAMP(p) WITH LOCAL TIME ZONE} where {@code p} is + * the number of digits of fractional seconds (=precision). {@code p} must have a value between 0 and + * 9 (both inclusive). If no precision is specified, {@code p} is equal to 6. + * + * <p>Compared to {@link ZonedTimestampType}, the time zone offset information is not stored physically + * in every datum. Instead, the type assumes {@link java.time.Instant} semantics in UTC time zone at + * the edges of the table ecosystem. Every datum is interpreted in the local time zone configured in + * the current session for computation and visualization. + * + * <p>This type fills the gap between time zone free and time zone mandatory timestamp types by allowing + * the interpretation of UTC timestamps according to the configured session timezone. A conversion + * from and to {@code int} describes the number of seconds since epoch. A conversion from and to {@code long} + * describes the number of milliseconds since epoch. + * + * @see TimestampType + * @see ZonedTimestampType + */ +@PublicEvolving +public final class LocalZonedTimestampType extends LogicalType { + + private static final int MIN_PRECISION = 0; + + private static final int MAX_PRECISION = 9; + + private static final int DEFAULT_PRECISION = 6; + + private static final String FORMAT = "TIMESTAMP(%d) WITH LOCAL TIME ZONE"; + + private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet( + java.time.Instant.class.getName()); + + private static final Set<String> NOT_NULL_INPUT_OUTPUT_CONVERSION = conversionSet( + java.time.Instant.class.getName(), + int.class.getName(), + long.class.getName()); + + private static final Class<?> DEFAULT_CONVERSION = java.time.Instant.class; + + private final TimestampKind kind; + + private final int precision; + + /** + * Internal constructor that allows attaching additional metadata about time attribute + * properties. The additional metadata does not affect equality or serializability. + * + * <p>Use {@link #getKind()} for comparing this metadata. + */ + @Internal + public LocalZonedTimestampType(boolean isNullable, TimestampKind kind, int precision) { + super(isNullable, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE); + if (precision < MIN_PRECISION || precision > MAX_PRECISION) { + throw new ValidationException( + String.format( + "Timestamp with local time zone precision must be between %d and %d (both inclusive).", + MIN_PRECISION, + MAX_PRECISION)); + } + this.kind = kind; + this.precision = precision; + } + + public LocalZonedTimestampType(boolean isNullable, int precision) { + this(isNullable, TimestampKind.REGULAR, precision); + } + + public LocalZonedTimestampType(int precision) { + this(true, precision); + } + + public LocalZonedTimestampType() { + this(DEFAULT_PRECISION); + } + + @Internal + public TimestampKind getKind() { + return kind; + } + + public int getPrecision() { + return precision; + } + + @Override + public LogicalType copy(boolean isNullable) { + return new LocalZonedTimestampType(isNullable, kind, precision); + } + + @Override + public String asSerializableString() { + return withNullability(FORMAT, precision); + } + + @Override + public boolean supportsInputConversion(Class<?> clazz) { + return NOT_NULL_INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public boolean supportsOutputConversion(Class<?> clazz) { + if (isNullable()) { + return NULL_OUTPUT_CONVERSION.contains(clazz.getName()); + } + return NOT_NULL_INPUT_OUTPUT_CONVERSION.contains(clazz.getName()); + } + + @Override + public Class<?> getDefaultConversion() { + return DEFAULT_CONVERSION; + } + + @Override + public List<LogicalType> getChildren() { + return Collections.emptyList(); + } + + @Override + public <R> R accept(LogicalTypeVisitor<R> visitor) { + return visitor.visit(this); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LocalZonedTimestampType that = (LocalZonedTimestampType) o; + return precision == that.precision; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), precision); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java index 136d638..20b9673 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeVisitor.java @@ -61,5 +61,7 @@ public interface LogicalTypeVisitor<R> { R visit(ZonedTimestampType zonedTimestampType); + R visit(LocalZonedTimestampType localZonedTimestampType); + R visit(LogicalType other); } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java index ff59488..53f35ea 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java @@ -27,6 +27,7 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.DoubleType; import org.apache.flink.table.types.logical.FloatType; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TimeType; @@ -261,6 +262,19 @@ public class LogicalTypesTest { ); } + @Test + public void testLocalZonedTimestampType() { + testAll( + new LocalZonedTimestampType(9), + "TIMESTAMP(9) WITH LOCAL TIME ZONE", + "TIMESTAMP(9) WITH LOCAL TIME ZONE", + new Class[]{java.time.Instant.class, long.class, int.class}, + new Class[]{java.time.Instant.class}, + new LogicalType[]{}, + new LocalZonedTimestampType(3) + ); + } + // -------------------------------------------------------------------------------------------- private static void testAll(