This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c33a6527d83 [FLINK-32396][jdbc-driver] Support timestamp and 
timestamp_ltz for jdbc driver and sql gateway (#22832)
c33a6527d83 is described below

commit c33a6527d8383dc571e0b648b8a29322416ab9d6
Author: Shammon FY <zjur...@gmail.com>
AuthorDate: Mon Sep 11 20:34:34 2023 +0800

    [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc 
driver and sql gateway (#22832)
    
    * [FLINK-32396][jdbc-driver] Support timestamp and timestamp_ltz for jdbc 
driver and sql gateway
---
 .../handler/statement/FetchResultsHandler.java     |  19 ++-
 .../flink/table/gateway/rest/serde/ResultInfo.java |  14 +-
 .../rest/util/RowDataLocalTimeZoneConverter.java   | 187 +++++++++++++++++++++
 .../utils/RowDataLocalTimeZoneConverterTest.java   | 172 +++++++++++++++++++
 .../apache/flink/table/jdbc/FlinkResultSet.java    |  43 +++--
 .../flink/table/jdbc/utils/ArrayFieldGetter.java   | 121 -------------
 .../flink/table/jdbc/FlinkStatementTest.java       |  52 ++++--
 7 files changed, 464 insertions(+), 144 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
index 8cb4ff80802..4303158ea4e 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.gateway.rest.handler.statement;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils;
@@ -39,13 +40,18 @@ import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsRowForm
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
 import 
org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter;
 import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import javax.annotation.Nonnull;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /** Handler to fetch results. */
 public class FetchResultsHandler
@@ -100,13 +106,24 @@ public class FetchResultsHandler
             return CompletableFuture.completedFuture(
                     new NotReadyFetchResultResponse(nextResultUri));
         } else {
+            RowDataLocalTimeZoneConverter timeZoneConverter = null;
+            if (rowFormat == RowFormat.JSON) {
+                List<LogicalType> logicalTypeList =
+                        
resultSet.getResultSchema().getColumnDataTypes().stream()
+                                .map(DataType::getLogicalType)
+                                .collect(Collectors.toList());
+                timeZoneConverter =
+                        new RowDataLocalTimeZoneConverter(
+                                logicalTypeList,
+                                
Configuration.fromMap(service.getSessionConfig(sessionHandle)));
+            }
             return CompletableFuture.completedFuture(
                     new FetchResultResponseBodyImpl(
                             resultType,
                             resultSet.isQueryResult(),
                             resultSet.getJobID(),
                             resultSet.getResultKind(),
-                            ResultInfo.createResultInfo(resultSet, rowFormat),
+                            ResultInfo.createResultInfo(resultSet, rowFormat, 
timeZoneConverter),
                             nextResultUri));
         }
     }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
index c34fb84dced..31957caca43 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData.FieldGetter;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.ResultSetImpl;
+import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter;
 import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
@@ -34,6 +35,8 @@ import org.apache.flink.util.Preconditions;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -64,12 +67,21 @@ public class ResultInfo {
         this.rowFormat = rowFormat;
     }
 
-    public static ResultInfo createResultInfo(ResultSet resultSet, RowFormat 
rowFormat) {
+    public static ResultInfo createResultInfo(
+            ResultSet resultSet,
+            RowFormat rowFormat,
+            @Nullable RowDataLocalTimeZoneConverter timeZoneConverter) {
         Preconditions.checkArgument(resultSet.getResultType() != 
ResultSet.ResultType.NOT_READY);
         List<RowData> data = resultSet.getData();
 
         switch (rowFormat) {
             case JSON:
+                if (timeZoneConverter != null && 
timeZoneConverter.hasTimeZoneData()) {
+                    data =
+                            data.stream()
+                                    
.map(timeZoneConverter::convertTimeZoneRowData)
+                                    .collect(Collectors.toList());
+                }
                 break;
             case PLAIN_TEXT:
                 RowDataToStringConverter converter = ((ResultSetImpl) 
resultSet).getConverter();
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowDataLocalTimeZoneConverter.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowDataLocalTimeZoneConverter.java
new file mode 100644
index 00000000000..efb5ea77d9b
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowDataLocalTimeZoneConverter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+/**
+ * Convert {@link LocalZonedTimestampType} data in {@link RowData} to {@link 
TimestampData} based on
+ * the local time zone.
+ */
+public class RowDataLocalTimeZoneConverter {
+    private final List<RowData.FieldGetter> fieldGetterList;
+    private final List<LogicalType> logicalTypeList;
+    private final boolean hasTimeZoneData;
+    private final TimeZone timeZone;
+
+    public RowDataLocalTimeZoneConverter(List<LogicalType> logicalTypeList, 
ReadableConfig config) {
+        this(logicalTypeList, 
TimeZone.getTimeZone(getSessionTimeZone(config)));
+    }
+
+    public RowDataLocalTimeZoneConverter(List<LogicalType> logicalTypeList, 
TimeZone timeZone) {
+        this.logicalTypeList = logicalTypeList;
+        this.timeZone = timeZone;
+        this.fieldGetterList = new ArrayList<>(logicalTypeList.size());
+        for (int i = 0; i < logicalTypeList.size(); i++) {
+            
fieldGetterList.add(RowData.createFieldGetter(logicalTypeList.get(i), i));
+        }
+        this.hasTimeZoneData = checkTimeZoneType(logicalTypeList);
+    }
+
+    private boolean checkTimeZoneType(List<LogicalType> logicalTypeList) {
+        for (LogicalType logicalType : logicalTypeList) {
+            if (logicalType.getTypeRoot() == 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+                return true;
+            } else if (logicalType.getTypeRoot() == LogicalTypeRoot.MAP) {
+                MapType mapType = (MapType) logicalType;
+                if (checkTimeZoneType(
+                        Arrays.asList(mapType.getKeyType(), 
mapType.getValueType()))) {
+                    return true;
+                }
+            } else if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY) {
+                ArrayType arrayType = (ArrayType) logicalType;
+                if 
(checkTimeZoneType(Collections.singletonList(arrayType.getElementType()))) {
+                    return true;
+                }
+            } else if (logicalType.getTypeRoot() == LogicalTypeRoot.MULTISET) {
+                MultisetType multisetType = (MultisetType) logicalType;
+                if 
(checkTimeZoneType(Collections.singletonList(multisetType.getElementType()))) {
+                    return true;
+                }
+            } else if (logicalType.getTypeRoot() == LogicalTypeRoot.ROW) {
+                RowType rowType = (RowType) logicalType;
+                if (checkTimeZoneType(
+                        rowType.getFields().stream()
+                                .map(RowType.RowField::getType)
+                                .collect(Collectors.toList()))) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    public RowData convertTimeZoneRowData(RowData rowData) {
+        if (!hasTimeZoneData()) {
+            return rowData;
+        }
+
+        GenericRowData result = new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+        for (int i = 0; i < fieldGetterList.size(); i++) {
+            result.setField(
+                    i,
+                    convertLocalTimeZoneValue(
+                            fieldGetterList.get(i).getFieldOrNull(rowData),
+                            logicalTypeList.get(i)));
+        }
+        return result;
+    }
+
+    private Object convertLocalTimeZoneValue(Object object, LogicalType 
dataType) {
+        if (object == null) {
+            return null;
+        }
+        switch (dataType.getTypeRoot()) {
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                {
+                    return DateTimeUtils.timestampWithLocalZoneToTimestamp(
+                            (TimestampData) object, timeZone);
+                }
+            case MAP:
+                {
+                    MapType mapType = (MapType) dataType;
+                    MapData mapData = (MapData) object;
+                    ArrayData keyArray = mapData.keyArray();
+                    ArrayData valueArray = mapData.valueArray();
+                    ArrayData.ElementGetter keyGetter =
+                            
ArrayData.createElementGetter(mapType.getKeyType());
+                    ArrayData.ElementGetter valueGetter =
+                            
ArrayData.createElementGetter(mapType.getValueType());
+                    Map<Object, Object> mapValue = new HashMap<>();
+                    for (int j = 0; j < keyArray.size(); j++) {
+                        mapValue.put(
+                                convertLocalTimeZoneValue(
+                                        keyGetter.getElementOrNull(keyArray, 
j),
+                                        mapType.getKeyType()),
+                                convertLocalTimeZoneValue(
+                                        
valueGetter.getElementOrNull(valueArray, j),
+                                        mapType.getValueType()));
+                    }
+                    return new GenericMapData(mapValue);
+                }
+            case ARRAY:
+                {
+                    ArrayType arrayType = (ArrayType) dataType;
+                    ArrayData arrayData = (ArrayData) object;
+                    ArrayData.ElementGetter dataGetter =
+                            
ArrayData.createElementGetter(arrayType.getElementType());
+                    List<Object> arrayValues = new 
ArrayList<>(arrayData.size());
+                    for (int i = 0; i < arrayData.size(); i++) {
+                        arrayValues.add(
+                                convertLocalTimeZoneValue(
+                                        dataGetter.getElementOrNull(arrayData, 
i),
+                                        arrayType.getElementType()));
+                    }
+                    return new GenericArrayData(arrayValues.toArray());
+                }
+            default:
+                {
+                    return object;
+                }
+        }
+    }
+
+    public boolean hasTimeZoneData() {
+        return hasTimeZoneData;
+    }
+
+    /** Get time zone from the given session config. */
+    private static ZoneId getSessionTimeZone(ReadableConfig sessionConfig) {
+        final String zone = 
sessionConfig.get(TableConfigOptions.LOCAL_TIME_ZONE);
+        return TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(zone)
+                ? ZoneId.systemDefault()
+                : ZoneId.of(zone);
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/utils/RowDataLocalTimeZoneConverterTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/utils/RowDataLocalTimeZoneConverterTest.java
new file mode 100644
index 00000000000..f8e1b020c11
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/utils/RowDataLocalTimeZoneConverterTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.utils;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.gateway.rest.util.RowDataLocalTimeZoneConverter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link RowDataLocalTimeZoneConverter}. */
+public class RowDataLocalTimeZoneConverterTest {
+    @Test
+    public void testCheckHasTimeZoneData() {
+        List<LogicalType> logicalTypeListWithoutTimestamp =
+                Collections.singletonList(new IntType());
+        List<LogicalType> logicalTypeListWithTimestamp =
+                Collections.singletonList(new LocalZonedTimestampType());
+        List<LogicalType> logicalTypeListWithMapTimestamp =
+                Arrays.asList(
+                        new IntType(),
+                        new MapType(
+                                new VarCharType(100),
+                                new MapType(new VarCharType(20), new 
LocalZonedTimestampType())));
+        List<LogicalType> logicalTypeListWithMultisetTimestamp =
+                Arrays.asList(new IntType(), new MultisetType(new 
LocalZonedTimestampType()));
+        List<LogicalType> logicalTypeListWithRowTimestamp =
+                Arrays.asList(
+                        new VarCharType(100),
+                        new RowType(
+                                Arrays.asList(
+                                        new RowType.RowField("a", new 
LocalZonedTimestampType()),
+                                        new RowType.RowField("b", new 
IntType()))));
+        assertFalse(
+                new RowDataLocalTimeZoneConverter(
+                                logicalTypeListWithoutTimestamp,
+                                TimeZone.getTimeZone("Asia/Shanghai"))
+                        .hasTimeZoneData());
+        assertTrue(
+                new RowDataLocalTimeZoneConverter(
+                                logicalTypeListWithTimestamp, 
TimeZone.getTimeZone("Asia/Shanghai"))
+                        .hasTimeZoneData());
+        assertTrue(
+                new RowDataLocalTimeZoneConverter(
+                                logicalTypeListWithMapTimestamp,
+                                TimeZone.getTimeZone("Asia/Shanghai"))
+                        .hasTimeZoneData());
+        assertTrue(
+                new RowDataLocalTimeZoneConverter(
+                                logicalTypeListWithMultisetTimestamp,
+                                TimeZone.getTimeZone("Asia/Shanghai"))
+                        .hasTimeZoneData());
+        assertTrue(
+                new RowDataLocalTimeZoneConverter(
+                                logicalTypeListWithRowTimestamp,
+                                TimeZone.getTimeZone("Asia/Shanghai"))
+                        .hasTimeZoneData());
+    }
+
+    @Test
+    public void testSimpleTimestampWithLocalZone() {
+        List<LogicalType> logicalTypeList =
+                Collections.singletonList(new LocalZonedTimestampType());
+        RowDataLocalTimeZoneConverter converter1 =
+                new RowDataLocalTimeZoneConverter(
+                        logicalTypeList, 
TimeZone.getTimeZone("Asia/Shanghai"));
+        RowDataLocalTimeZoneConverter converter2 =
+                new RowDataLocalTimeZoneConverter(
+                        logicalTypeList, 
TimeZone.getTimeZone("Europe/Berlin"));
+
+        RowData data = 
GenericRowData.of(TimestampData.fromEpochMillis(100000000000L));
+        RowData data1 = converter1.convertTimeZoneRowData(data);
+        RowData data2 = converter2.convertTimeZoneRowData(data);
+        assertEquals(data.toString(), "+I(1973-03-03T09:46:40)");
+        assertEquals(data1.toString(), "+I(1973-03-03T17:46:40)");
+        assertEquals(data2.toString(), "+I(1973-03-03T10:46:40)");
+    }
+
+    @Test
+    public void testComplexTimestampWithLocalZone() {
+        List<LogicalType> logicalTypeList =
+                Arrays.asList(
+                        new IntType(),
+                        new MapType(
+                                new VarCharType(100),
+                                new MapType(new VarCharType(20), new 
LocalZonedTimestampType())));
+        RowDataLocalTimeZoneConverter converter1 =
+                new RowDataLocalTimeZoneConverter(
+                        logicalTypeList, 
TimeZone.getTimeZone("Asia/Shanghai"));
+        RowDataLocalTimeZoneConverter converter2 =
+                new RowDataLocalTimeZoneConverter(
+                        logicalTypeList, 
TimeZone.getTimeZone("Europe/Berlin"));
+
+        Map<StringData, TimestampData> timestampMapValue = new HashMap<>();
+        timestampMapValue.put(
+                StringData.fromString("123"), 
TimestampData.fromEpochMillis(100000000000L));
+        Map<StringData, MapData> timestampMapData = new HashMap<>();
+        timestampMapData.put(StringData.fromString("321"), new 
GenericMapData(timestampMapValue));
+        RowData data = GenericRowData.of(100, new 
GenericMapData(timestampMapData));
+        RowData data1 = converter1.convertTimeZoneRowData(data);
+        RowData data2 = converter2.convertTimeZoneRowData(data);
+
+        assertEquals(2, data.getArity());
+        assertEquals(2, data1.getArity());
+        assertEquals(2, data2.getArity());
+
+        assertEquals(100L, data.getInt(0));
+        assertEquals(100L, data1.getInt(0));
+        assertEquals(100L, data2.getInt(0));
+
+        MapData mapData = data.getMap(1);
+        MapData mapData1 = data1.getMap(1);
+        MapData mapData2 = data2.getMap(1);
+        assertEquals(1, mapData.size());
+        assertEquals(1, mapData1.size());
+        assertEquals(1, mapData2.size());
+        assertEquals("321", mapData.keyArray().getString(0).toString());
+        assertEquals("321", mapData1.keyArray().getString(0).toString());
+        assertEquals("321", mapData2.keyArray().getString(0).toString());
+
+        MapData mapValue = mapData.valueArray().getMap(0);
+        MapData mapValue1 = mapData1.valueArray().getMap(0);
+        MapData mapValue2 = mapData2.valueArray().getMap(0);
+        assertEquals(1, mapValue.size());
+        assertEquals(1, mapValue1.size());
+        assertEquals(1, mapValue2.size());
+        assertEquals("123", mapValue.keyArray().getString(0).toString());
+        assertEquals("123", mapValue1.keyArray().getString(0).toString());
+        assertEquals("123", mapValue2.keyArray().getString(0).toString());
+        assertEquals("1973-03-03T09:46:40", 
mapValue.valueArray().getTimestamp(0, 0).toString());
+        assertEquals("1973-03-03T17:46:40", 
mapValue1.valueArray().getTimestamp(0, 0).toString());
+        assertEquals("1973-03-03T10:46:40", 
mapValue2.valueArray().getTimestamp(0, 0).toString());
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
index 47a899a9e7a..4735ce4669a 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
@@ -25,7 +25,7 @@ import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.jdbc.utils.ArrayFieldGetter;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.jdbc.utils.CloseableResultIterator;
 import org.apache.flink.table.jdbc.utils.StatementResultIterator;
 import org.apache.flink.table.types.DataType;
@@ -44,6 +44,8 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
@@ -277,20 +279,17 @@ public class FlinkResultSet extends BaseResultSet {
 
     @Override
     public Date getDate(int columnIndex) throws SQLException {
-        // TODO support date data
-        throw new IllegalArgumentException();
+        return (Date) getObject(columnIndex);
     }
 
     @Override
     public Time getTime(int columnIndex) throws SQLException {
-        // TODO support time data
-        throw new IllegalArgumentException();
+        return (Time) getObject(columnIndex);
     }
 
     @Override
     public Timestamp getTimestamp(int columnIndex) throws SQLException {
-        // TODO support time timestamp
-        throw new IllegalArgumentException();
+        return (Timestamp) getObject(columnIndex);
     }
 
     @Override
@@ -416,8 +415,8 @@ public class FlinkResultSet extends BaseResultSet {
                 {
                     LogicalType keyType = ((MapType) dataType).getKeyType();
                     LogicalType valueType = ((MapType) 
dataType).getValueType();
-                    ArrayFieldGetter keyGetter = 
ArrayFieldGetter.createFieldGetter(keyType);
-                    ArrayFieldGetter valueGetter = 
ArrayFieldGetter.createFieldGetter(valueType);
+                    ArrayData.ElementGetter keyGetter = 
ArrayData.createElementGetter(keyType);
+                    ArrayData.ElementGetter valueGetter = 
ArrayData.createElementGetter(valueType);
                     MapData mapData = (MapData) object;
                     int size = mapData.size();
                     ArrayData keyArrayData = mapData.keyArray();
@@ -426,12 +425,34 @@ public class FlinkResultSet extends BaseResultSet {
                     for (int i = 0; i < size; i++) {
                         mapResult.put(
                                 convertToJavaObject(
-                                        
keyGetter.getObjectOrNull(keyArrayData, i), keyType),
+                                        
keyGetter.getElementOrNull(keyArrayData, i), keyType),
                                 convertToJavaObject(
-                                        
valueGetter.getObjectOrNull(valueArrayData, i), valueType));
+                                        
valueGetter.getElementOrNull(valueArrayData, i),
+                                        valueType));
                     }
                     return mapResult;
                 }
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                {
+                    return ((TimestampData) object).toTimestamp();
+                }
+            case TIMESTAMP_WITH_TIME_ZONE:
+                {
+                    // TODO should be supported after
+                    // https://issues.apache.org/jira/browse/FLINK-20869
+                    throw new SQLDataException(
+                            "TIMESTAMP WITH TIME ZONE is not supported, use 
TIMESTAMP or TIMESTAMP_LTZ instead");
+                }
+            case TIME_WITHOUT_TIME_ZONE:
+                {
+                    return Time.valueOf(
+                            LocalTime.ofNanoOfDay(((Number) object).intValue() 
* 1_000_000L));
+                }
+            case DATE:
+                {
+                    return Date.valueOf(LocalDate.ofEpochDay(((Number) 
object).intValue()));
+                }
             default:
                 {
                     throw new SQLDataException(
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java
deleted file mode 100644
index 42f3bad38e3..00000000000
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/ArrayFieldGetter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.jdbc.utils;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.types.logical.DistinctType;
-import org.apache.flink.table.types.logical.LogicalType;
-
-import javax.annotation.Nullable;
-
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
-
-/** Creates an accessor for getting elements in an array data structure at the 
given position. */
-public interface ArrayFieldGetter {
-    @Nullable
-    Object getObjectOrNull(ArrayData array, int index);
-
-    static ArrayFieldGetter createFieldGetter(LogicalType type) {
-
-        final ArrayFieldGetter fieldGetter;
-        // ordered by type root definition
-        switch (type.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-                fieldGetter = ArrayData::getString;
-                break;
-            case BOOLEAN:
-                fieldGetter = ArrayData::getBoolean;
-                break;
-            case BINARY:
-            case VARBINARY:
-                fieldGetter = ArrayData::getBinary;
-                break;
-            case DECIMAL:
-                final int decimalPrecision = getPrecision(type);
-                final int decimalScale = getScale(type);
-                fieldGetter =
-                        (array, index) -> array.getDecimal(index, 
decimalPrecision, decimalScale);
-                break;
-            case TINYINT:
-                fieldGetter = ArrayData::getByte;
-                break;
-            case SMALLINT:
-                fieldGetter = ArrayData::getShort;
-                break;
-            case INTEGER:
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-            case INTERVAL_YEAR_MONTH:
-                fieldGetter = ArrayData::getInt;
-                break;
-            case BIGINT:
-            case INTERVAL_DAY_TIME:
-                fieldGetter = ArrayData::getLong;
-                break;
-            case FLOAT:
-                fieldGetter = ArrayData::getFloat;
-                break;
-            case DOUBLE:
-                fieldGetter = ArrayData::getDouble;
-                break;
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                final int timestampPrecision = getPrecision(type);
-                fieldGetter = (array, index) -> array.getTimestamp(index, 
timestampPrecision);
-                break;
-            case TIMESTAMP_WITH_TIME_ZONE:
-                throw new UnsupportedOperationException();
-            case ARRAY:
-                fieldGetter = ArrayData::getArray;
-                break;
-            case MULTISET:
-            case MAP:
-                fieldGetter = ArrayData::getMap;
-                break;
-            case ROW:
-            case STRUCTURED_TYPE:
-                final int arrayFieldCount = getFieldCount(type);
-                fieldGetter = (array, index) -> array.getRow(index, 
arrayFieldCount);
-                break;
-            case DISTINCT_TYPE:
-                fieldGetter = createFieldGetter(((DistinctType) 
type).getSourceType());
-                break;
-            case RAW:
-                fieldGetter = ArrayData::getRawValue;
-                break;
-            case NULL:
-            case SYMBOL:
-            case UNRESOLVED:
-            default:
-                throw new IllegalArgumentException();
-        }
-        if (!type.isNullable()) {
-            return fieldGetter;
-        }
-        return (array, index) -> {
-            if (array.isNullAt(index)) {
-                return null;
-            }
-            return fieldGetter.getObjectOrNull(array, index);
-        };
-    }
-}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
index 28e33dce73b..8ff4af291a8 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkStatementTest.java
@@ -60,7 +60,7 @@ public class FlinkStatementTest extends 
FlinkJdbcDriverTestBase {
                 assertFalse(
                         statement.execute(
                                 String.format(
-                                        "CREATE TABLE test_table(id bigint, 
val int, str string) "
+                                        "CREATE TABLE test_table(id bigint, 
val int, str string, timestamp1 timestamp(0), timestamp2 timestamp_ltz(3), 
time_data time, date_data date) "
                                                 + "with ("
                                                 + "'connector'='filesystem',\n"
                                                 + "'format'='csv',\n"
@@ -72,10 +72,10 @@ public class FlinkStatementTest extends 
FlinkJdbcDriverTestBase {
                 assertTrue(
                         statement.execute(
                                 "INSERT INTO test_table VALUES "
-                                        + "(1, 11, '111'), "
-                                        + "(3, 33, '333'), "
-                                        + "(2, 22, '222'), "
-                                        + "(4, 44, '444')"));
+                                        + "(1, 11, '111', TIMESTAMP 
'2021-04-15 23:18:36', TO_TIMESTAMP_LTZ(400000000000, 3), TIME '12:32:00', DATE 
'2023-11-02'), "
+                                        + "(3, 33, '333', TIMESTAMP 
'2021-04-16 23:18:36', TO_TIMESTAMP_LTZ(500000000000, 3), TIME '13:32:00', DATE 
'2023-12-02'), "
+                                        + "(2, 22, '222', TIMESTAMP 
'2021-04-17 23:18:36', TO_TIMESTAMP_LTZ(600000000000, 3), TIME '14:32:00', DATE 
'2023-01-02'), "
+                                        + "(4, 44, '444', TIMESTAMP 
'2021-04-18 23:18:36', TO_TIMESTAMP_LTZ(700000000000, 3), TIME '15:32:00', DATE 
'2023-02-02')"));
                 assertThatThrownBy(statement::getUpdateCount)
                         .isInstanceOf(SQLFeatureNotSupportedException.class)
                         .hasMessage("FlinkStatement#getUpdateCount is not 
supported for query");
@@ -106,23 +106,55 @@ public class FlinkStatementTest extends 
FlinkJdbcDriverTestBase {
                 }
 
                 // SELECT all data from test_table
+                statement.execute("SET 'table.local-time-zone' = 'UTC'");
                 try (ResultSet resultSet = statement.executeQuery("SELECT * 
FROM test_table")) {
-                    assertEquals(3, resultSet.getMetaData().getColumnCount());
+                    assertEquals(7, resultSet.getMetaData().getColumnCount());
                     List<String> resultList = new ArrayList<>();
                     while (resultSet.next()) {
                         assertEquals(resultSet.getLong("id"), 
resultSet.getLong(1));
                         assertEquals(resultSet.getInt("val"), 
resultSet.getInt(2));
                         assertEquals(resultSet.getString("str"), 
resultSet.getString(3));
+                        assertEquals(resultSet.getTimestamp("timestamp1"), 
resultSet.getObject(4));
+                        assertEquals(resultSet.getObject("timestamp2"), 
resultSet.getTimestamp(5));
+                        assertEquals(resultSet.getObject("time_data"), 
resultSet.getTime(6));
+                        assertEquals(resultSet.getObject("date_data"), 
resultSet.getDate(7));
                         resultList.add(
                                 String.format(
-                                        "%s,%s,%s",
+                                        "%s,%s,%s,%s,%s,%s,%s",
                                         resultSet.getLong("id"),
                                         resultSet.getInt("val"),
-                                        resultSet.getString("str")));
+                                        resultSet.getString("str"),
+                                        resultSet.getTimestamp("timestamp1"),
+                                        resultSet.getTimestamp("timestamp2"),
+                                        resultSet.getTime("time_data"),
+                                        resultSet.getDate("date_data")));
                     }
                     assertThat(resultList)
                             .containsExactlyInAnyOrder(
-                                    "1,11,111", "2,22,222", "3,33,333", 
"4,44,444");
+                                    "1,11,111,2021-04-15 23:18:36.0,1982-09-04 
15:06:40.0,12:32:00,2023-11-02",
+                                    "3,33,333,2021-04-16 23:18:36.0,1985-11-05 
00:53:20.0,13:32:00,2023-12-02",
+                                    "2,22,222,2021-04-17 23:18:36.0,1989-01-05 
10:40:00.0,14:32:00,2023-01-02",
+                                    "4,44,444,2021-04-18 23:18:36.0,1992-03-07 
20:26:40.0,15:32:00,2023-02-02");
+                }
+
+                // SELECT all data from test_table with local time zone
+                statement.execute("SET 'table.local-time-zone' = 
'Asia/Shanghai'");
+                try (ResultSet resultSet = statement.executeQuery("SELECT * 
FROM test_table")) {
+                    assertEquals(7, resultSet.getMetaData().getColumnCount());
+                    List<String> resultList = new ArrayList<>();
+                    while (resultSet.next()) {
+                        resultList.add(
+                                String.format(
+                                        "%s,%s",
+                                        resultSet.getTimestamp("timestamp1"),
+                                        resultSet.getTimestamp("timestamp2")));
+                    }
+                    assertThat(resultList)
+                            .containsExactlyInAnyOrder(
+                                    "2021-04-15 23:18:36.0,1982-09-04 
23:06:40.0",
+                                    "2021-04-16 23:18:36.0,1985-11-05 
08:53:20.0",
+                                    "2021-04-17 23:18:36.0,1989-01-05 
18:40:00.0",
+                                    "2021-04-18 23:18:36.0,1992-03-08 
04:26:40.0");
                 }
 
                 assertTrue(statement.execute("SHOW JOBS"));
@@ -133,7 +165,7 @@ public class FlinkStatementTest extends 
FlinkJdbcDriverTestBase {
                         assertEquals("FINISHED", resultSet.getString(3));
                         count++;
                     }
-                    assertEquals(2, count);
+                    assertEquals(3, count);
                 }
             }
         }


Reply via email to