This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c33a6527d83 [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway (#22832) c33a6527d83 is described below commit c33a6527d8383dc571e0b648b8a29322416ab9d6 Author: Shammon FY <zjur...@gmail.com> AuthorDate: Mon Sep 11 20:34:34 2023 +0800 [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway (#22832) * [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc driver and sql gateway --- .../handler/statement/FetchResultsHandler.java | 19 ++- .../flink/table/gateway/rest/serde/ResultInfo.java | 14 +- .../rest/util/RowDataLocalTimeZoneConverter.java | 187 +++++++++++++++++++++ .../utils/RowDataLocalTimeZoneConverterTest.java | 172 +++++++++++++++++++ .../apache/flink/table/jdbc/FlinkResultSet.java | 43 +++-- .../flink/table/jdbc/utils/ArrayFieldGetter.java | 121 ------------- .../flink/table/jdbc/FlinkStatementTest.java | 52 ++++-- 7 files changed, 464 insertions(+), 144 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java index 8cb4ff80802..4303158ea4e 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.table.gateway.rest.handler.statement; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils; @@ -39,13 +40,18 @@ import org.apache.flink.table.gateway.rest.message.statement.FetchResultsRowForm import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter; import org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse; import org.apache.flink.table.gateway.rest.serde.ResultInfo; +import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter; import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import javax.annotation.Nonnull; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** Handler to fetch results. */ public class FetchResultsHandler @@ -100,13 +106,24 @@ public class FetchResultsHandler return CompletableFuture.completedFuture( new NotReadyFetchResultResponse(nextResultUri)); } else { + RowDataLocalTimeZoneConverter timeZoneConverter = null; + if (rowFormat == RowFormat.JSON) { + List<LogicalType> logicalTypeList = + resultSet.getResultSchema().getColumnDataTypes().stream() + .map(DataType::getLogicalType) + .collect(Collectors.toList()); + timeZoneConverter = + new RowDataLocalTimeZoneConverter( + logicalTypeList, + Configuration.fromMap(service.getSessionConfig(sessionHandle))); + } return CompletableFuture.completedFuture( new FetchResultResponseBodyImpl( resultType, resultSet.isQueryResult(), resultSet.getJobID(), resultSet.getResultKind(), - ResultInfo.createResultInfo(resultSet, rowFormat), + ResultInfo.createResultInfo(resultSet, rowFormat, timeZoneConverter), nextResultUri)); } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java index c34fb84dced..31957caca43 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java @@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData.FieldGetter; import org.apache.flink.table.data.StringData; import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.results.ResultSetImpl; +import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter; import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.utils.print.RowDataToStringConverter; @@ -34,6 +35,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; +import javax.annotation.Nullable; + import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -64,12 +67,21 @@ public class ResultInfo { this.rowFormat = rowFormat; } - public static ResultInfo createResultInfo(ResultSet resultSet, RowFormat rowFormat) { + public static ResultInfo createResultInfo( + ResultSet resultSet, + RowFormat rowFormat, + @Nullable RowDataLocalTimeZoneConverter timeZoneConverter) { Preconditions.checkArgument(resultSet.getResultType() != ResultSet.ResultType.NOT_READY); List<RowData> data = resultSet.getData(); switch (rowFormat) { case JSON: + if (timeZoneConverter != null && timeZoneConverter.hasTimeZoneData()) { + data = + data.stream() + .map(timeZoneConverter::convertTimeZoneRowData) + .collect(Collectors.toList()); + } break; case PLAIN_TEXT: RowDataToStringConverter converter = ((ResultSetImpl) resultSet).getConverter(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowDataLocalTimeZoneConverter.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowDataLocalTimeZoneConverter.java new file mode 100644 index 00000000000..efb5ea77d9b --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowDataLocalTimeZoneConverter.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.util; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.DateTimeUtils; + +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.stream.Collectors; + +/** + * Convert {@link LocalZonedTimestampType} data in {@link RowData} to {@link TimestampData} based on + * the local time zone. + */ +public class RowDataLocalTimeZoneConverter { + private final List<RowData.FieldGetter> fieldGetterList; + private final List<LogicalType> logicalTypeList; + private final boolean hasTimeZoneData; + private final TimeZone timeZone; + + public RowDataLocalTimeZoneConverter(List<LogicalType> logicalTypeList, ReadableConfig config) { + this(logicalTypeList, TimeZone.getTimeZone(getSessionTimeZone(config))); + } + + public RowDataLocalTimeZoneConverter(List<LogicalType> logicalTypeList, TimeZone timeZone) { + this.logicalTypeList = logicalTypeList; + this.timeZone = timeZone; + this.fieldGetterList = new ArrayList<>(logicalTypeList.size()); + for (int i = 0; i < logicalTypeList.size(); i++) { + fieldGetterList.add(RowData.createFieldGetter(logicalTypeList.get(i), i)); + } + this.hasTimeZoneData = checkTimeZoneType(logicalTypeList); + } + + private boolean checkTimeZoneType(List<LogicalType> logicalTypeList) { + for (LogicalType logicalType : logicalTypeList) { + if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { + return true; + } else if (logicalType.getTypeRoot() == LogicalTypeRoot.MAP) { + MapType mapType = (MapType) logicalType; + if (checkTimeZoneType( + Arrays.asList(mapType.getKeyType(), mapType.getValueType()))) { + return true; + } + } else if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY) { + ArrayType arrayType = (ArrayType) logicalType; + if (checkTimeZoneType(Collections.singletonList(arrayType.getElementType()))) { + return true; + } + } else if (logicalType.getTypeRoot() == LogicalTypeRoot.MULTISET) { + MultisetType multisetType = (MultisetType) logicalType; + if (checkTimeZoneType(Collections.singletonList(multisetType.getElementType()))) { + return true; + } + } else if (logicalType.getTypeRoot() == LogicalTypeRoot.ROW) { + RowType rowType = (RowType) logicalType; + if (checkTimeZoneType( + rowType.getFields().stream() + .map(RowType.RowField::getType) + .collect(Collectors.toList()))) { + return true; + } + } + } + return false; + } + + public RowData convertTimeZoneRowData(RowData rowData) { + if (!hasTimeZoneData()) { + return rowData; + } + + GenericRowData result = new GenericRowData(rowData.getRowKind(), rowData.getArity()); + for (int i = 0; i < fieldGetterList.size(); i++) { + result.setField( + i, + convertLocalTimeZoneValue( + fieldGetterList.get(i).getFieldOrNull(rowData), + logicalTypeList.get(i))); + } + return result; + } + + private Object convertLocalTimeZoneValue(Object object, LogicalType dataType) { + if (object == null) { + return null; + } + switch (dataType.getTypeRoot()) { + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + { + return DateTimeUtils.timestampWithLocalZoneToTimestamp( + (TimestampData) object, timeZone); + } + case MAP: + { + MapType mapType = (MapType) dataType; + MapData mapData = (MapData) object; + ArrayData keyArray = mapData.keyArray(); + ArrayData valueArray = mapData.valueArray(); + ArrayData.ElementGetter keyGetter = + ArrayData.createElementGetter(mapType.getKeyType()); + ArrayData.ElementGetter valueGetter = + ArrayData.createElementGetter(mapType.getValueType()); + Map<Object, Object> mapValue = new HashMap<>(); + for (int j = 0; j < keyArray.size(); j++) { + mapValue.put( + convertLocalTimeZoneValue( + keyGetter.getElementOrNull(keyArray, j), + mapType.getKeyType()), + convertLocalTimeZoneValue( + valueGetter.getElementOrNull(valueArray, j), + mapType.getValueType())); + } + return new GenericMapData(mapValue); + } + case ARRAY: + { + ArrayType arrayType = (ArrayType) dataType; + ArrayData arrayData = (ArrayData) object; + ArrayData.ElementGetter dataGetter = + ArrayData.createElementGetter(arrayType.getElementType()); + List<Object> arrayValues = new ArrayList<>(arrayData.size()); + for (int i = 0; i < arrayData.size(); i++) { + arrayValues.add( + convertLocalTimeZoneValue( + dataGetter.getElementOrNull(arrayData, i), + arrayType.getElementType())); + } + return new GenericArrayData(arrayValues.toArray()); + } + default: + { + return object; + } + } + } + + public boolean hasTimeZoneData() { + return hasTimeZoneData; + } + + /** Get time zone from the given session config. */ + private static ZoneId getSessionTimeZone(ReadableConfig sessionConfig) { + final String zone = sessionConfig.get(TableConfigOptions.LOCAL_TIME_ZONE); + return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone) + ? ZoneId.systemDefault() + : ZoneId.of(zone); + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/utils/RowDataLocalTimeZoneConverterTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/utils/RowDataLocalTimeZoneConverterTest.java new file mode 100644 index 00000000000..f8e1b020c11 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/utils/RowDataLocalTimeZoneConverterTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.utils; + +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Tests for {@link RowDataLocalTimeZoneConverter}. */ +public class RowDataLocalTimeZoneConverterTest { + @Test + public void testCheckHasTimeZoneData() { + List<LogicalType> logicalTypeListWithoutTimestamp = + Collections.singletonList(new IntType()); + List<LogicalType> logicalTypeListWithTimestamp = + Collections.singletonList(new LocalZonedTimestampType()); + List<LogicalType> logicalTypeListWithMapTimestamp = + Arrays.asList( + new IntType(), + new MapType( + new VarCharType(100), + new MapType(new VarCharType(20), new LocalZonedTimestampType()))); + List<LogicalType> logicalTypeListWithMultisetTimestamp = + Arrays.asList(new IntType(), new MultisetType(new LocalZonedTimestampType())); + List<LogicalType> logicalTypeListWithRowTimestamp = + Arrays.asList( + new VarCharType(100), + new RowType( + Arrays.asList( + new RowType.RowField("a", new LocalZonedTimestampType()), + new RowType.RowField("b", new IntType())))); + assertFalse( + new RowDataLocalTimeZoneConverter( + logicalTypeListWithoutTimestamp, + TimeZone.getTimeZone("Asia/Shanghai")) + .hasTimeZoneData()); + assertTrue( + new RowDataLocalTimeZoneConverter( + logicalTypeListWithTimestamp, TimeZone.getTimeZone("Asia/Shanghai")) + .hasTimeZoneData()); + assertTrue( + new RowDataLocalTimeZoneConverter( + logicalTypeListWithMapTimestamp, + TimeZone.getTimeZone("Asia/Shanghai")) + .hasTimeZoneData()); + assertTrue( + new RowDataLocalTimeZoneConverter( + logicalTypeListWithMultisetTimestamp, + TimeZone.getTimeZone("Asia/Shanghai")) + .hasTimeZoneData()); + assertTrue( + new RowDataLocalTimeZoneConverter( + logicalTypeListWithRowTimestamp, + TimeZone.getTimeZone("Asia/Shanghai")) + .hasTimeZoneData()); + } + + @Test + public void testSimpleTimestampWithLocalZone() { + List<LogicalType> logicalTypeList = + Collections.singletonList(new LocalZonedTimestampType()); + RowDataLocalTimeZoneConverter converter1 = + new RowDataLocalTimeZoneConverter( + logicalTypeList, TimeZone.getTimeZone("Asia/Shanghai")); + RowDataLocalTimeZoneConverter converter2 = + new RowDataLocalTimeZoneConverter( + logicalTypeList, TimeZone.getTimeZone("Europe/Berlin")); + + RowData data = GenericRowData.of(TimestampData.fromEpochMillis(100000000000L)); + RowData data1 = converter1.convertTimeZoneRowData(data); + RowData data2 = converter2.convertTimeZoneRowData(data); + assertEquals(data.toString(), "+I(1973-03-03T09:46:40)"); + assertEquals(data1.toString(), "+I(1973-03-03T17:46:40)"); + assertEquals(data2.toString(), "+I(1973-03-03T10:46:40)"); + } + + @Test + public void testComplexTimestampWithLocalZone() { + List<LogicalType> logicalTypeList = + Arrays.asList( + new IntType(), + new MapType( + new VarCharType(100), + new MapType(new VarCharType(20), new LocalZonedTimestampType()))); + RowDataLocalTimeZoneConverter converter1 = + new RowDataLocalTimeZoneConverter( + logicalTypeList, TimeZone.getTimeZone("Asia/Shanghai")); + RowDataLocalTimeZoneConverter converter2 = + new RowDataLocalTimeZoneConverter( + logicalTypeList, TimeZone.getTimeZone("Europe/Berlin")); + + Map<StringData, TimestampData> timestampMapValue = new HashMap<>(); + timestampMapValue.put( + StringData.fromString("123"), TimestampData.fromEpochMillis(100000000000L)); + Map<StringData, MapData> timestampMapData = new HashMap<>(); + timestampMapData.put(StringData.fromString("321"), new GenericMapData(timestampMapValue)); + RowData data = GenericRowData.of(100, new GenericMapData(timestampMapData)); + RowData data1 = converter1.convertTimeZoneRowData(data); + RowData data2 = converter2.convertTimeZoneRowData(data); + + assertEquals(2, data.getArity()); + assertEquals(2, data1.getArity()); + assertEquals(2, data2.getArity()); + + assertEquals(100L, data.getInt(0)); + assertEquals(100L, data1.getInt(0)); + assertEquals(100L, data2.getInt(0)); + + MapData mapData = data.getMap(1); + MapData mapData1 = data1.getMap(1); + MapData mapData2 = data2.getMap(1); + assertEquals(1, mapData.size()); + assertEquals(1, mapData1.size()); + assertEquals(1, mapData2.size()); + assertEquals("321", mapData.keyArray().getString(0).toString()); + assertEquals("321", mapData1.keyArray().getString(0).toString()); + assertEquals("321", mapData2.keyArray().getString(0).toString()); + + MapData mapValue = mapData.valueArray().getMap(0); + MapData mapValue1 = mapData1.valueArray().getMap(0); + MapData mapValue2 = mapData2.valueArray().getMap(0); + assertEquals(1, mapValue.size()); + assertEquals(1, mapValue1.size()); + assertEquals(1, mapValue2.size()); + assertEquals("123", mapValue.keyArray().getString(0).toString()); + assertEquals("123", mapValue1.keyArray().getString(0).toString()); + assertEquals("123", mapValue2.keyArray().getString(0).toString()); + assertEquals("1973-03-03T09:46:40", mapValue.valueArray().getTimestamp(0, 0).toString()); + assertEquals("1973-03-03T17:46:40", mapValue1.valueArray().getTimestamp(0, 0).toString()); + assertEquals("1973-03-03T10:46:40", mapValue2.valueArray().getTimestamp(0, 0).toString()); + } +} diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java index 47a899a9e7a..4735ce4669a 100644 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java +++ b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java @@ -25,7 +25,7 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; -import org.apache.flink.table.jdbc.utils.ArrayFieldGetter; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.jdbc.utils.CloseableResultIterator; import org.apache.flink.table.jdbc.utils.StatementResultIterator; import org.apache.flink.table.types.DataType; @@ -44,6 +44,8 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; @@ -277,20 +279,17 @@ public class FlinkResultSet extends BaseResultSet { @Override public Date getDate(int columnIndex) throws SQLException { - // TODO support date data - throw new IllegalArgumentException(); + return (Date) getObject(columnIndex); } @Override public Time getTime(int columnIndex) throws SQLException { - // TODO support time data - throw new IllegalArgumentException(); + return (Time) getObject(columnIndex); } @Override public Timestamp getTimestamp(int columnIndex) throws SQLException { - // TODO support time timestamp - throw new IllegalArgumentException(); + return (Timestamp) getObject(columnIndex); } @Override @@ -416,8 +415,8 @@ public class FlinkResultSet extends BaseResultSet { { LogicalType keyType = ((MapType) dataType).getKeyType(); LogicalType valueType = ((MapType) dataType).getValueType(); - ArrayFieldGetter keyGetter = ArrayFieldGetter.createFieldGetter(keyType); - ArrayFieldGetter valueGetter = ArrayFieldGetter.createFieldGetter(valueType); + ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(keyType); + ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); MapData mapData = (MapData) object; int size = mapData.size(); ArrayData keyArrayData = mapData.keyArray(); @@ -426,12 +425,34 @@ public class FlinkResultSet extends BaseResultSet { for (int i = 0; i < size; i++) { mapResult.put( convertToJavaObject( - keyGetter.getObjectOrNull(keyArrayData, i), keyType), + keyGetter.getElementOrNull(keyArrayData, i), keyType), convertToJavaObject( - valueGetter.getObjectOrNull(valueArrayData, i), valueType)); + valueGetter.getElementOrNull(valueArrayData, i), + valueType)); } return mapResult; } + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + { + return ((TimestampData) object).toTimestamp(); + } + case TIMESTAMP_WITH_TIME_ZONE: + { + // TODO should be supported after + // https://issues.apache.org/jira/browse/FLINK-20869 + throw new SQLDataException( + "TIMESTAMP WITH TIME ZONE is not supported, use TIMESTAMP or TIMESTAMP_LTZ instead"); + } + case TIME_WITHOUT_TIME_ZONE: + { + return Time.valueOf( + LocalTime.ofNanoOfDay(((Number) object).intValue() * 1_000_000L)); + } + case DATE: + { + return Date.valueOf(LocalDate.ofEpochDay(((Number) object).intValue())); + } default: { throw new SQLDataException( diff --git a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java deleted file mode 100644 index 42f3bad38e3..00000000000 --- a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.jdbc.utils; - -import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.types.logical.DistinctType; -import org.apache.flink.table.types.logical.LogicalType; - -import javax.annotation.Nullable; - -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale; - -/** Creates an accessor for getting elements in an array data structure at the given position. */ -public interface ArrayFieldGetter { - @Nullable - Object getObjectOrNull(ArrayData array, int index); - - static ArrayFieldGetter createFieldGetter(LogicalType type) { - - final ArrayFieldGetter fieldGetter; - // ordered by type root definition - switch (type.getTypeRoot()) { - case CHAR: - case VARCHAR: - fieldGetter = ArrayData::getString; - break; - case BOOLEAN: - fieldGetter = ArrayData::getBoolean; - break; - case BINARY: - case VARBINARY: - fieldGetter = ArrayData::getBinary; - break; - case DECIMAL: - final int decimalPrecision = getPrecision(type); - final int decimalScale = getScale(type); - fieldGetter = - (array, index) -> array.getDecimal(index, decimalPrecision, decimalScale); - break; - case TINYINT: - fieldGetter = ArrayData::getByte; - break; - case SMALLINT: - fieldGetter = ArrayData::getShort; - break; - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - case INTERVAL_YEAR_MONTH: - fieldGetter = ArrayData::getInt; - break; - case BIGINT: - case INTERVAL_DAY_TIME: - fieldGetter = ArrayData::getLong; - break; - case FLOAT: - fieldGetter = ArrayData::getFloat; - break; - case DOUBLE: - fieldGetter = ArrayData::getDouble; - break; - case TIMESTAMP_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - final int timestampPrecision = getPrecision(type); - fieldGetter = (array, index) -> array.getTimestamp(index, timestampPrecision); - break; - case TIMESTAMP_WITH_TIME_ZONE: - throw new UnsupportedOperationException(); - case ARRAY: - fieldGetter = ArrayData::getArray; - break; - case MULTISET: - case MAP: - fieldGetter = ArrayData::getMap; - break; - case ROW: - case STRUCTURED_TYPE: - final int arrayFieldCount = getFieldCount(type); - fieldGetter = (array, index) -> array.getRow(index, arrayFieldCount); - break; - case DISTINCT_TYPE: - fieldGetter = createFieldGetter(((DistinctType) type).getSourceType()); - break; - case RAW: - fieldGetter = ArrayData::getRawValue; - break; - case NULL: - case SYMBOL: - case UNRESOLVED: - default: - throw new IllegalArgumentException(); - } - if (!type.isNullable()) { - return fieldGetter; - } - return (array, index) -> { - if (array.isNullAt(index)) { - return null; - } - return fieldGetter.getObjectOrNull(array, index); - }; - } -} diff --git a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java index 28e33dce73b..8ff4af291a8 100644 --- a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java +++ b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java @@ -60,7 +60,7 @@ public class FlinkStatementTest extends FlinkJdbcDriverTestBase { assertFalse( statement.execute( String.format( - "CREATE TABLE test_table(id bigint, val int, str string) " + "CREATE TABLE test_table(id bigint, val int, str string, timestamp1 timestamp(0), timestamp2 timestamp_ltz(3), time_data time, date_data date) " + "with (" + "'connector'='filesystem',\n" + "'format'='csv',\n" @@ -72,10 +72,10 @@ public class FlinkStatementTest extends FlinkJdbcDriverTestBase { assertTrue( statement.execute( "INSERT INTO test_table VALUES " - + "(1, 11, '111'), " - + "(3, 33, '333'), " - + "(2, 22, '222'), " - + "(4, 44, '444')")); + + "(1, 11, '111', TIMESTAMP '2021-04-15 23:18:36', TO_TIMESTAMP_LTZ(400000000000, 3), TIME '12:32:00', DATE '2023-11-02'), " + + "(3, 33, '333', TIMESTAMP '2021-04-16 23:18:36', TO_TIMESTAMP_LTZ(500000000000, 3), TIME '13:32:00', DATE '2023-12-02'), " + + "(2, 22, '222', TIMESTAMP '2021-04-17 23:18:36', TO_TIMESTAMP_LTZ(600000000000, 3), TIME '14:32:00', DATE '2023-01-02'), " + + "(4, 44, '444', TIMESTAMP '2021-04-18 23:18:36', TO_TIMESTAMP_LTZ(700000000000, 3), TIME '15:32:00', DATE '2023-02-02')")); assertThatThrownBy(statement::getUpdateCount) .isInstanceOf(SQLFeatureNotSupportedException.class) .hasMessage("FlinkStatement#getUpdateCount is not supported for query"); @@ -106,23 +106,55 @@ public class FlinkStatementTest extends FlinkJdbcDriverTestBase { } // SELECT all data from test_table + statement.execute("SET 'table.local-time-zone' = 'UTC'"); try (ResultSet resultSet = statement.executeQuery("SELECT * FROM test_table")) { - assertEquals(3, resultSet.getMetaData().getColumnCount()); + assertEquals(7, resultSet.getMetaData().getColumnCount()); List<String> resultList = new ArrayList<>(); while (resultSet.next()) { assertEquals(resultSet.getLong("id"), resultSet.getLong(1)); assertEquals(resultSet.getInt("val"), resultSet.getInt(2)); assertEquals(resultSet.getString("str"), resultSet.getString(3)); + assertEquals(resultSet.getTimestamp("timestamp1"), resultSet.getObject(4)); + assertEquals(resultSet.getObject("timestamp2"), resultSet.getTimestamp(5)); + assertEquals(resultSet.getObject("time_data"), resultSet.getTime(6)); + assertEquals(resultSet.getObject("date_data"), resultSet.getDate(7)); resultList.add( String.format( - "%s,%s,%s", + "%s,%s,%s,%s,%s,%s,%s", resultSet.getLong("id"), resultSet.getInt("val"), - resultSet.getString("str"))); + resultSet.getString("str"), + resultSet.getTimestamp("timestamp1"), + resultSet.getTimestamp("timestamp2"), + resultSet.getTime("time_data"), + resultSet.getDate("date_data"))); } assertThat(resultList) .containsExactlyInAnyOrder( - "1,11,111", "2,22,222", "3,33,333", "4,44,444"); + "1,11,111,2021-04-15 23:18:36.0,1982-09-04 15:06:40.0,12:32:00,2023-11-02", + "3,33,333,2021-04-16 23:18:36.0,1985-11-05 00:53:20.0,13:32:00,2023-12-02", + "2,22,222,2021-04-17 23:18:36.0,1989-01-05 10:40:00.0,14:32:00,2023-01-02", + "4,44,444,2021-04-18 23:18:36.0,1992-03-07 20:26:40.0,15:32:00,2023-02-02"); + } + + // SELECT all data from test_table with local time zone + statement.execute("SET 'table.local-time-zone' = 'Asia/Shanghai'"); + try (ResultSet resultSet = statement.executeQuery("SELECT * FROM test_table")) { + assertEquals(7, resultSet.getMetaData().getColumnCount()); + List<String> resultList = new ArrayList<>(); + while (resultSet.next()) { + resultList.add( + String.format( + "%s,%s", + resultSet.getTimestamp("timestamp1"), + resultSet.getTimestamp("timestamp2"))); + } + assertThat(resultList) + .containsExactlyInAnyOrder( + "2021-04-15 23:18:36.0,1982-09-04 23:06:40.0", + "2021-04-16 23:18:36.0,1985-11-05 08:53:20.0", + "2021-04-17 23:18:36.0,1989-01-05 18:40:00.0", + "2021-04-18 23:18:36.0,1992-03-08 04:26:40.0"); } assertTrue(statement.execute("SHOW JOBS")); @@ -133,7 +165,7 @@ public class FlinkStatementTest extends FlinkJdbcDriverTestBase { assertEquals("FINISHED", resultSet.getString(3)); count++; } - assertEquals(2, count); + assertEquals(3, count); } } }