This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 4379f2c39fb [FLINK-25565][Formats][Parquet] write and read parquet 
int64 timestamp (#18304) (#23887)
4379f2c39fb is described below

commit 4379f2c39fbdaeb78f874f6dd461d20b8a8961cd
Author: Thomas Weise <twe...@users.noreply.github.com>
AuthorDate: Fri Dec 8 13:49:04 2023 -0500

    [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp 
(#18304) (#23887)
    
    Co-authored-by: Bo Cui <cuibo0...@163.com>
---
 .../docs/connectors/table/formats/parquet.md       |  14 ++
 .../docs/connectors/table/formats/parquet.md       |  14 ++
 .../formats/parquet/ParquetFileFormatFactory.java  |  15 ++
 .../parquet/ParquetVectorizedInputFormat.java      |  13 +-
 .../formats/parquet/row/ParquetRowDataBuilder.java |  13 +-
 .../formats/parquet/row/ParquetRowDataWriter.java  |  67 ++++-
 .../parquet/utils/ParquetSchemaConverter.java      |  50 +++-
 .../formats/parquet/vector/ParquetDictionary.java  |  20 +-
 .../parquet/vector/ParquetSplitReaderUtil.java     |   3 +-
 .../vector/reader/AbstractColumnReader.java        |   2 +-
 .../vector/reader/TimestampColumnReader.java       | 101 +++++++-
 .../formats/parquet/ParquetTimestampITCase.java    | 280 +++++++++++++++++++++
 .../parquet/row/ParquetRowDataWriterTest.java      |  14 ++
 .../vector/ParquetInt64TimestampReaderTest.java    |  69 +++++
 .../runtime/stream/FsStreamingSinkITCaseBase.scala |  65 +++--
 15 files changed, 683 insertions(+), 57 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/parquet.md 
b/docs/content.zh/docs/connectors/table/formats/parquet.md
index 92536d00f98..295b11ef41b 100644
--- a/docs/content.zh/docs/connectors/table/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/table/formats/parquet.md
@@ -84,6 +84,20 @@ Format 参数
       <td>Boolean</td>
       <td>使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 
Hive 3.x 使用 UTC 时区。</td>
     </tr>
+    <tr>
+      <td><h5>timestamp.time.unit</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">micros</td>
+      <td>String</td>
+      <td>根据TimeUnit在Timestamp和int64之间进行转换,可选值nanos/micros/millis。</td>
+    </tr>
+    <tr>
+      <td><h5>write.int64.timestamp</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>以int64替代int96存储parquet Timestamp。 注意:Timestamp将于时区无关(从不转换为不同时区)。</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/formats/parquet.md 
b/docs/content/docs/connectors/table/formats/parquet.md
index 0b7ba9c42f5..75c524f238f 100644
--- a/docs/content/docs/connectors/table/formats/parquet.md
+++ b/docs/content/docs/connectors/table/formats/parquet.md
@@ -84,6 +84,20 @@ Format Options
       <td>Boolean</td>
       <td>Use UTC timezone or local timezone to the conversion between epoch 
time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use 
UTC timezone.</td>
     </tr>
+    <tr>
+      <td><h5>timestamp.time.unit</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">micros</td>
+      <td>String</td>
+      <td>Store parquet int64/LogicalTypes timestamps in this time unit, value 
is nanos/micros/millis.</td>
+    </tr>
+    <tr>
+      <td><h5>write.int64.timestamp</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Write parquet timestamp as int64/LogicalTypes instead of 
int96/OriginalTypes. Note: Timestamp will be time zone agnostic (NEVER 
converted to a different time zone).</td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
index 14f257899c1..8be727c7947 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java
@@ -68,6 +68,21 @@ public class ParquetFileFormatFactory implements 
BulkReaderFormatFactory, BulkWr
                                     + " time and LocalDateTime. Hive 
0.x/1.x/2.x use local timezone. But Hive 3.x"
                                     + " use UTC timezone");
 
+    public static final ConfigOption<String> TIMESTAMP_TIME_UNIT =
+            key("timestamp.time.unit")
+                    .stringType()
+                    .defaultValue("micros")
+                    .withDescription(
+                            "Store parquet int64/LogicalTypes timestamps in 
this time unit, value is nanos/micros/millis");
+
+    public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP =
+            key("write.int64.timestamp")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Write parquet timestamp as int64/LogicalTypes 
instead of int96/OriginalTypes. "
+                                    + "Note: Timestamp will be time zone 
agnostic (NEVER converted to a different time zone).");
+
     @Override
     public BulkDecodingFormat<RowData> createDecodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) 
{
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
index 914df658de1..d675dc359d8 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
@@ -130,7 +130,8 @@ public abstract class ParquetVectorizedInputFormat<T, 
SplitT extends FileSourceS
         MessageType fileSchema = 
parquetFileReader.getFooter().getFileMetaData().getSchema();
         // Pruning unnecessary column, we should set the projection schema 
before running any
         // filtering (e.g. getting filtered record count) because projection 
impacts filtering
-        MessageType requestedSchema = clipParquetSchema(fileSchema, 
unknownFieldsIndices);
+        MessageType requestedSchema =
+                clipParquetSchema(fileSchema, unknownFieldsIndices, 
hadoopConfig.conf());
         parquetFileReader.setRequestedSchema(requestedSchema);
 
         checkSchema(fileSchema, requestedSchema);
@@ -173,7 +174,9 @@ public abstract class ParquetVectorizedInputFormat<T, 
SplitT extends FileSourceS
 
     /** Clips `parquetSchema` according to `fieldNames`. */
     private MessageType clipParquetSchema(
-            GroupType parquetSchema, Collection<Integer> unknownFieldsIndices) 
{
+            GroupType parquetSchema,
+            Collection<Integer> unknownFieldsIndices,
+            org.apache.hadoop.conf.Configuration config) {
         Type[] types = new Type[projectedFields.length];
         if (isCaseSensitive) {
             for (int i = 0; i < projectedFields.length; ++i) {
@@ -185,7 +188,7 @@ public abstract class ParquetVectorizedInputFormat<T, 
SplitT extends FileSourceS
                             parquetSchema);
                     types[i] =
                             ParquetSchemaConverter.convertToParquetType(
-                                    fieldName, projectedTypes[i]);
+                                    fieldName, projectedTypes[i], config);
                     unknownFieldsIndices.add(i);
                 } else {
                     types[i] = parquetSchema.getType(fieldName);
@@ -215,7 +218,9 @@ public abstract class ParquetVectorizedInputFormat<T, 
SplitT extends FileSourceS
                             parquetSchema);
                     type =
                             ParquetSchemaConverter.convertToParquetType(
-                                    
projectedFields[i].toLowerCase(Locale.ROOT), projectedTypes[i]);
+                                    
projectedFields[i].toLowerCase(Locale.ROOT),
+                                    projectedTypes[i],
+                                    config);
                     unknownFieldsIndices.add(i);
                 }
                 // TODO clip for array,map,row types.
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
index 30dc8dbc236..5abb21d5a26 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
@@ -65,13 +65,19 @@ public class ParquetRowDataBuilder extends 
ParquetWriter.Builder<RowData, Parque
 
     @Override
     protected WriteSupport<RowData> getWriteSupport(Configuration conf) {
-        return new ParquetWriteSupport();
+        return new ParquetWriteSupport(conf);
     }
 
     private class ParquetWriteSupport extends WriteSupport<RowData> {
 
-        private MessageType schema = 
convertToParquetMessageType("flink_schema", rowType);
+        private MessageType schema = null;
         private ParquetRowDataWriter writer;
+        private Configuration conf;
+
+        private ParquetWriteSupport(Configuration conf) {
+            this.conf = conf;
+            schema = convertToParquetMessageType("flink_schema", rowType, 
conf);
+        }
 
         @Override
         public WriteContext init(Configuration configuration) {
@@ -80,7 +86,8 @@ public class ParquetRowDataBuilder extends 
ParquetWriter.Builder<RowData, Parque
 
         @Override
         public void prepareForWrite(RecordConsumer recordConsumer) {
-            this.writer = new ParquetRowDataWriter(recordConsumer, rowType, 
schema, utcTimestamp);
+            this.writer =
+                    new ParquetRowDataWriter(recordConsumer, rowType, schema, 
utcTimestamp, conf);
         }
 
         @Override
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
index 4a521373e34..37b65b2b58b 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.GroupType;
@@ -47,9 +48,14 @@ import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.List;
 
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.IDENTIFIER;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP;
 import static 
org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MICROS_PER_MILLISECOND;
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MICROSECONDS;
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND;
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND;
 
@@ -60,14 +66,34 @@ public class ParquetRowDataWriter {
     private final RecordConsumer recordConsumer;
     private final boolean utcTimestamp;
 
+    private final Configuration conf;
+    private boolean useInt64 = false;
+    private LogicalTypeAnnotation.TimeUnit timeUnit;
+
     public ParquetRowDataWriter(
             RecordConsumer recordConsumer,
             RowType rowType,
             GroupType schema,
-            boolean utcTimestamp) {
+            boolean utcTimestamp,
+            Configuration conf) {
         this.recordConsumer = recordConsumer;
         this.utcTimestamp = utcTimestamp;
-
+        this.conf = conf;
+        if (this.conf != null) {
+            useInt64 =
+                    this.conf.getBoolean(
+                            IDENTIFIER + "." + WRITE_INT64_TIMESTAMP.key(),
+                            WRITE_INT64_TIMESTAMP.defaultValue());
+            if (useInt64) {
+                timeUnit =
+                        LogicalTypeAnnotation.TimeUnit.valueOf(
+                                this.conf
+                                        .get(
+                                                IDENTIFIER + "." + 
TIMESTAMP_TIME_UNIT.key(),
+                                                
TIMESTAMP_TIME_UNIT.defaultValue())
+                                        .toUpperCase());
+            }
+        }
         rowWriter = new RowWriter(rowType, schema);
     }
 
@@ -326,7 +352,7 @@ public class ParquetRowDataWriter {
         }
 
         private void writeTimestamp(TimestampData value) {
-            recordConsumer.addBinary(timestampToInt96(value));
+            ParquetRowDataWriter.this.writeTimestamp(recordConsumer, value);
         }
     }
 
@@ -477,6 +503,41 @@ public class ParquetRowDataWriter {
         public void write(ArrayData arrayData, int ordinal) {}
     }
 
+    private void writeTimestamp(RecordConsumer recordConsumer, TimestampData 
timestampData) {
+        if (useInt64) {
+            recordConsumer.addLong(timestampToInt64(timestampData));
+        } else {
+            recordConsumer.addBinary(timestampToInt96(timestampData));
+        }
+    }
+
+    private Long convertInt64ToLong(long mills, long nanosOfMillisecond) {
+        switch (timeUnit) {
+            case NANOS:
+                return mills * NANOS_PER_MILLISECOND + nanosOfMillisecond;
+            case MICROS:
+                return mills * MICROS_PER_MILLISECOND + nanosOfMillisecond / 
NANOS_PER_MICROSECONDS;
+            case MILLIS:
+                return mills;
+            default:
+                throw new IllegalArgumentException("Time unit not recognized");
+        }
+    }
+
+    private Long timestampToInt64(TimestampData timestampData) {
+        long mills = 0L;
+        long nanosOfMillisecond = 0L;
+        if (utcTimestamp) {
+            mills = timestampData.getMillisecond();
+            nanosOfMillisecond = timestampData.getNanoOfMillisecond();
+        } else {
+            Timestamp timestamp = timestampData.toTimestamp();
+            mills = timestamp.getTime();
+            nanosOfMillisecond = timestamp.getNanos() % NANOS_PER_MILLISECOND;
+        }
+        return convertInt64ToLong(mills, nanosOfMillisecond);
+    }
+
     private Binary timestampToInt96(TimestampData timestampData) {
         int julianDay;
         long nanosOfDay;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index 377803aa170..b7c91ee62ff 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -26,8 +26,10 @@ import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -37,26 +39,33 @@ import org.apache.parquet.schema.Types;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.IDENTIFIER;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP;
+
 /** Schema converter converts Parquet schema to and from Flink internal types. 
*/
 public class ParquetSchemaConverter {
 
     static final String MAP_REPEATED_NAME = "key_value";
     static final String LIST_ELEMENT_NAME = "element";
 
-    public static MessageType convertToParquetMessageType(String name, RowType 
rowType) {
+    public static MessageType convertToParquetMessageType(
+            String name, RowType rowType, Configuration conf) {
         Type[] types = new Type[rowType.getFieldCount()];
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-            types[i] = convertToParquetType(rowType.getFieldNames().get(i), 
rowType.getTypeAt(i));
+            types[i] =
+                    convertToParquetType(
+                            rowType.getFieldNames().get(i), 
rowType.getTypeAt(i), conf);
         }
         return new MessageType(name, types);
     }
 
-    public static Type convertToParquetType(String name, LogicalType type) {
-        return convertToParquetType(name, type, Type.Repetition.OPTIONAL);
+    public static Type convertToParquetType(String name, LogicalType type, 
Configuration conf) {
+        return convertToParquetType(name, type, Type.Repetition.OPTIONAL, 
conf);
     }
 
     private static Type convertToParquetType(
-            String name, LogicalType type, Type.Repetition repetition) {
+            String name, LogicalType type, Type.Repetition repetition, 
Configuration conf) {
         switch (type.getTypeRoot()) {
             case CHAR:
             case VARCHAR:
@@ -111,6 +120,19 @@ public class ParquetSchemaConverter {
                         .named(name);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (conf.getBoolean(
+                        IDENTIFIER + "." + WRITE_INT64_TIMESTAMP.key(),
+                        WRITE_INT64_TIMESTAMP.defaultValue())) {
+                    LogicalTypeAnnotation.TimeUnit timeUnit =
+                            LogicalTypeAnnotation.TimeUnit.valueOf(
+                                    conf.get(
+                                                    IDENTIFIER + "." + 
TIMESTAMP_TIME_UNIT.key(),
+                                                    
TIMESTAMP_TIME_UNIT.defaultValue())
+                                            .toUpperCase());
+                    return 
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
+                            .as(LogicalTypeAnnotation.timestampType(false, 
timeUnit))
+                            .named(name);
+                }
                 return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, 
repetition)
                         .named(name);
             case ARRAY:
@@ -118,35 +140,37 @@ public class ParquetSchemaConverter {
                 return ConversionPatterns.listOfElements(
                         repetition,
                         name,
-                        convertToParquetType(LIST_ELEMENT_NAME, 
arrayType.getElementType()));
+                        convertToParquetType(LIST_ELEMENT_NAME, 
arrayType.getElementType(), conf));
             case MAP:
                 MapType mapType = (MapType) type;
                 return ConversionPatterns.mapType(
                         repetition,
                         name,
                         MAP_REPEATED_NAME,
-                        convertToParquetType("key", mapType.getKeyType()),
-                        convertToParquetType("value", mapType.getValueType()));
+                        convertToParquetType("key", mapType.getKeyType(), 
conf),
+                        convertToParquetType("value", mapType.getValueType(), 
conf));
             case MULTISET:
                 MultisetType multisetType = (MultisetType) type;
                 return ConversionPatterns.mapType(
                         repetition,
                         name,
                         MAP_REPEATED_NAME,
-                        convertToParquetType("key", 
multisetType.getElementType()),
-                        convertToParquetType("value", new IntType(false)));
+                        convertToParquetType("key", 
multisetType.getElementType(), conf),
+                        convertToParquetType("value", new IntType(false), 
conf));
             case ROW:
                 RowType rowType = (RowType) type;
-                return new GroupType(repetition, name, 
convertToParquetTypes(rowType));
+                return new GroupType(repetition, name, 
convertToParquetTypes(rowType, conf));
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + 
type);
         }
     }
 
-    private static List<Type> convertToParquetTypes(RowType rowType) {
+    private static List<Type> convertToParquetTypes(RowType rowType, 
Configuration conf) {
         List<Type> types = new ArrayList<>(rowType.getFieldCount());
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-            types.add(convertToParquetType(rowType.getFieldNames().get(i), 
rowType.getTypeAt(i)));
+            types.add(
+                    convertToParquetType(
+                            rowType.getFieldNames().get(i), 
rowType.getTypeAt(i), conf));
         }
         return types;
     }
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
index 2a098110aa5..f36c7ff4bfa 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
@@ -21,15 +21,23 @@ package org.apache.flink.formats.parquet.vector;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.columnar.vector.Dictionary;
 
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.PrimitiveType;
+
+import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.decodeInt64ToTimestamp;
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.decodeInt96ToTimestamp;
 
 /** Parquet dictionary. */
 public final class ParquetDictionary implements Dictionary {
 
     private org.apache.parquet.column.Dictionary dictionary;
+    private final ColumnDescriptor descriptor;
 
-    public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) {
+    public ParquetDictionary(
+            org.apache.parquet.column.Dictionary dictionary, ColumnDescriptor 
descriptor) {
         this.dictionary = dictionary;
+        this.descriptor = descriptor;
     }
 
     @Override
@@ -59,6 +67,16 @@ public final class ParquetDictionary implements Dictionary {
 
     @Override
     public TimestampData decodeToTimestamp(int id) {
+        if (descriptor.getPrimitiveType().getPrimitiveTypeName()
+                == PrimitiveType.PrimitiveTypeName.INT64) {
+            return decodeInt64ToTimestamp(
+                    true,
+                    dictionary,
+                    id,
+                    ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
+                                    
descriptor.getPrimitiveType().getLogicalTypeAnnotation())
+                            .getUnit());
+        }
         return decodeInt96ToTimestamp(true, dictionary, id);
     }
 }
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
index 15ee191d3f7..77fa87c7e26 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
@@ -483,7 +483,8 @@ public class ParquetSplitReaderUtil {
             case TIMESTAMP_WITHOUT_TIME_ZONE:
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 checkArgument(
-                        typeName == PrimitiveType.PrimitiveTypeName.INT96,
+                        typeName == PrimitiveType.PrimitiveTypeName.INT96
+                                || typeName == 
PrimitiveType.PrimitiveTypeName.INT64,
                         "Unexpected type: %s",
                         typeName);
                 return new HeapTimestampVector(batchSize);
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
index 8f4382af6b4..c57a5afc6d1 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
@@ -173,7 +173,7 @@ public abstract class AbstractColumnReader<VECTOR extends 
WritableColumnVector>
                     // We can't do this if rowId != 0 AND the column doesn't 
have a dictionary (i.e.
                     // some
                     // non-dictionary encoded values have already been added).
-                    vector.setDictionary(new ParquetDictionary(dictionary));
+                    vector.setDictionary(new ParquetDictionary(dictionary, 
descriptor));
                 } else {
                     readBatchFromDictionaryIds(rowId, num, vector, 
dictionaryIds);
                 }
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
index 154ae13e18b..aa544f4e91c 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
@@ -25,6 +25,7 @@ import org.apache.parquet.Preconditions;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
@@ -34,9 +35,9 @@ import java.sql.Timestamp;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Timestamp {@link ColumnReader}. We only support INT96 bytes now, 
julianDay(4) + nanosOfDay(8).
- * See 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
- * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType.
+ * Timestamp {@link ColumnReader}. We support INT96 and INT64 now, 
julianDay(4) + nanosOfDay(8). See
+ * 
https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp 
TIMESTAMP_MILLIS
+ * and TIMESTAMP_MICROS are the deprecated ConvertedType.
  */
 public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestampVector> {
 
@@ -44,15 +45,40 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
     public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
     public static final long NANOS_PER_MILLISECOND = 
TimeUnit.MILLISECONDS.toNanos(1);
     public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+    public static final long MICROS_PER_MILLISECOND = 
TimeUnit.MILLISECONDS.toMicros(1);
+    public static final long NANOS_PER_MICROSECONDS = 
TimeUnit.MICROSECONDS.toNanos(1);
+    public static final long MILLIS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
+    public static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
 
     private final boolean utcTimestamp;
+    private final PrimitiveType.PrimitiveTypeName actualName;
+    private final LogicalTypeAnnotation.TimeUnit timeUnit;
 
     public TimestampColumnReader(
             boolean utcTimestamp, ColumnDescriptor descriptor, PageReader 
pageReader)
             throws IOException {
         super(descriptor, pageReader);
         this.utcTimestamp = utcTimestamp;
-        checkTypeName(PrimitiveType.PrimitiveTypeName.INT96);
+        actualName = descriptor.getPrimitiveType().getPrimitiveTypeName();
+        checkTypeName();
+        if (actualName == PrimitiveType.PrimitiveTypeName.INT64) {
+            timeUnit =
+                    ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
+                                    
descriptor.getPrimitiveType().getLogicalTypeAnnotation())
+                            .getUnit();
+        } else {
+            timeUnit = null;
+        }
+    }
+
+    private void checkTypeName() {
+        Preconditions.checkArgument(
+                actualName == PrimitiveType.PrimitiveTypeName.INT96
+                        || actualName == PrimitiveType.PrimitiveTypeName.INT64,
+                "Expected type name: %s or %s, actual type name: %s",
+                PrimitiveType.PrimitiveTypeName.INT64,
+                actualName == PrimitiveType.PrimitiveTypeName.INT96,
+                actualName);
     }
 
     @Override
@@ -64,10 +90,16 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
     protected void readBatch(int rowId, int num, WritableTimestampVector 
column) {
         for (int i = 0; i < num; i++) {
             if (runLenDecoder.readInteger() == maxDefLevel) {
-                ByteBuffer buffer = readDataBuffer(12);
-                column.setTimestamp(
-                        rowId + i,
-                        int96ToTimestamp(utcTimestamp, buffer.getLong(), 
buffer.getInt()));
+                if (actualName == PrimitiveType.PrimitiveTypeName.INT64) {
+                    ByteBuffer buffer = readDataBuffer(8);
+                    column.setTimestamp(
+                            rowId + i, int64ToTimestamp(utcTimestamp, 
buffer.getLong(), timeUnit));
+                } else {
+                    ByteBuffer buffer = readDataBuffer(12);
+                    column.setTimestamp(
+                            rowId + i,
+                            int96ToTimestamp(utcTimestamp, buffer.getLong(), 
buffer.getInt()));
+                }
             } else {
                 column.setNullAt(rowId + i);
             }
@@ -79,13 +111,29 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
             int rowId, int num, WritableTimestampVector column, 
WritableIntVector dictionaryIds) {
         for (int i = rowId; i < rowId + num; ++i) {
             if (!column.isNullAt(i)) {
-                column.setTimestamp(
-                        i,
-                        decodeInt96ToTimestamp(utcTimestamp, dictionary, 
dictionaryIds.getInt(i)));
+                if (actualName == PrimitiveType.PrimitiveTypeName.INT64) {
+                    column.setTimestamp(
+                            i,
+                            decodeInt64ToTimestamp(
+                                    utcTimestamp, dictionary, 
dictionaryIds.getInt(i), timeUnit));
+                } else {
+                    column.setTimestamp(
+                            i,
+                            decodeInt96ToTimestamp(
+                                    utcTimestamp, dictionary, 
dictionaryIds.getInt(i)));
+                }
             }
         }
     }
 
+    public static TimestampData decodeInt64ToTimestamp(
+            boolean utcTimestamp,
+            org.apache.parquet.column.Dictionary dictionary,
+            int id,
+            LogicalTypeAnnotation.TimeUnit timeUnit) {
+        return int64ToTimestamp(utcTimestamp, dictionary.decodeToLong(id), 
timeUnit);
+    }
+
     public static TimestampData decodeInt96ToTimestamp(
             boolean utcTimestamp, org.apache.parquet.column.Dictionary 
dictionary, int id) {
         Binary binary = dictionary.decodeToBinary(id);
@@ -109,6 +157,37 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
         }
     }
 
+    public static TimestampData int64ToTimestamp(
+            boolean utcTimestamp, long value, LogicalTypeAnnotation.TimeUnit 
timeUnit) {
+        long nanosOfMillisecond = 0L;
+        long milliseconds = 0L;
+
+        switch (timeUnit) {
+            case MILLIS:
+                milliseconds = value;
+                nanosOfMillisecond = value % MILLIS_PER_SECOND * 
NANOS_PER_MILLISECOND;
+                break;
+            case MICROS:
+                milliseconds = value / MICROS_PER_MILLISECOND;
+                nanosOfMillisecond = (value % MICROS_PER_SECOND) * 
NANOS_PER_MICROSECONDS;
+                break;
+            case NANOS:
+                milliseconds = value / NANOS_PER_MILLISECOND;
+                nanosOfMillisecond = value % NANOS_PER_SECOND;
+                break;
+            default:
+                break;
+        }
+
+        if (utcTimestamp) {
+            return TimestampData.fromEpochMillis(
+                    milliseconds, (int) (nanosOfMillisecond % 
NANOS_PER_MILLISECOND));
+        }
+        Timestamp timestamp = new Timestamp(milliseconds);
+        timestamp.setNanos((int) nanosOfMillisecond);
+        return TimestampData.fromTimestamp(timestamp);
+    }
+
     private static long julianDayToMillis(int julianDay) {
         return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
     }
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
new file mode 100644
index 00000000000..ed267d329d0
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.stream.FiniteTestSource;
+import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;
+import org.apache.flink.table.planner.runtime.utils.TestSinkUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.parquet.Strings;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+import scala.collection.Seq;
+
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND;
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER;
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN;
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY;
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.UTC_TIMEZONE;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP;
+import static org.junit.Assert.assertEquals;
+
+/** Test int64 timestamp. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class ParquetTimestampITCase extends FsStreamingSinkITCaseBase {
+    @Parameter public static boolean useInt64;
+
+    @Parameter public static String timeunit;
+
+    @Parameter public static boolean timezone;
+
+    @Parameters(name = "useInt64 = {0}, timeunit = {1}, timezone = {2}")
+    public static Object[] parameters() {
+        return new Object[][] {
+            new Object[] {false, "millis", false},
+            new Object[] {true, "millis", false},
+            new Object[] {true, "micros", false},
+            new Object[] {true, "nanos", false},
+            new Object[] {true, "millis", true},
+            new Object[] {true, "micros", true},
+            new Object[] {true, "nanos", true}
+        };
+    }
+
+    @Override
+    public Seq<Row> getData() {
+        return JavaScalaConversionUtil.toScala(
+                new ArrayList<Row>() {
+                    {
+                        add(
+                                Row.of(
+                                        Integer.valueOf(1),
+                                        "a",
+                                        Timestamp.valueOf("2020-05-03 
07:00:00.000000000"),
+                                        "05-03-2020",
+                                        "07"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(2),
+                                        "p",
+                                        Timestamp.valueOf("2020-05-03 
08:01:01.111111111"),
+                                        "05-03-2020",
+                                        "08"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(3),
+                                        "x",
+                                        Timestamp.valueOf("2020-05-03 
09:02:02.222222222"),
+                                        "05-03-2020",
+                                        "09"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(4),
+                                        "x",
+                                        Timestamp.valueOf("2020-05-03 
10:03:03.333333333"),
+                                        "05-03-2020",
+                                        "10"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(5),
+                                        "x",
+                                        Timestamp.valueOf("2020-05-03 
11:04:04.444444444"),
+                                        "05-03-2020",
+                                        "11"));
+                    }
+                });
+    }
+
+    @Override
+    public Seq<Row> getData2() {
+        return JavaScalaConversionUtil.toScala(
+                new ArrayList<Row>() {
+                    {
+                        add(
+                                Row.of(
+                                        Integer.valueOf(1),
+                                        "a",
+                                        Timestamp.valueOf("2020-05-03 
07:00:00.000000000"),
+                                        "20200503",
+                                        "07"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(2),
+                                        "p",
+                                        Timestamp.valueOf("2020-05-03 
08:01:01.111111111"),
+                                        "20200503",
+                                        "08"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(3),
+                                        "x",
+                                        Timestamp.valueOf("2020-05-03 
09:02:02.222222222"),
+                                        "20200503",
+                                        "09"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(4),
+                                        "x",
+                                        Timestamp.valueOf("2020-05-04 
10:03:03.333333333"),
+                                        "20200504",
+                                        "10"));
+                        add(
+                                Row.of(
+                                        Integer.valueOf(5),
+                                        "x",
+                                        Timestamp.valueOf("2020-05-04 
11:04:04.444444444"),
+                                        "20200504",
+                                        "11"));
+                    }
+                });
+    }
+
+    @Override
+    public DataStream<Row> getDataStream2(Function1<Row, Object> fun) {
+        return new DataStream<Row>(
+                env().getJavaEnv()
+                        .addSource(
+                                new FiniteTestSource(getData2(), fun),
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            Types.INT,
+                                            Types.STRING,
+                                            Types.SQL_TIMESTAMP,
+                                            Types.STRING,
+                                            Types.STRING
+                                        },
+                                        new String[] {"a", "b", "c", "d", 
"e"})));
+    }
+
+    @Override
+    public DataStream<Row> getDataStream(Function1<Row, Object> fun) {
+        return new DataStream<Row>(
+                env().getJavaEnv()
+                        .addSource(
+                                new FiniteTestSource(getData(), fun),
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                            Types.INT,
+                                            Types.STRING,
+                                            Types.SQL_TIMESTAMP,
+                                            Types.STRING,
+                                            Types.STRING
+                                        },
+                                        new String[] {"a", "b", "c", "d", 
"e"})));
+    }
+
+    @Override
+    public String getDDL(
+            String timeExtractorKind,
+            String timeExtractorFormatterPattern,
+            String timeExtractorPattern,
+            String partition,
+            String commitTrigger,
+            String commitDelay,
+            String policy,
+            String successFileName) {
+        StringBuffer ddl = new StringBuffer("create table sink_table (");
+        ddl.append("  a int, ");
+        ddl.append(" b string, ");
+        ddl.append(" c timestamp(3), ");
+        ddl.append(" d string,");
+        ddl.append(" e string");
+        ddl.append(") ");
+        if (!Strings.isNullOrEmpty(partition)) {
+            ddl.append("partitioned by ( " + partition + " ) ");
+        }
+        ddl.append("with ( ");
+        ddl.append(" 'connector' = 'filesystem', ");
+        ddl.append(" 'path' = '" + resultPath() + "', ");
+        ddl.append(
+                " '" + PARTITION_TIME_EXTRACTOR_KIND.key() + "' = '" + 
timeExtractorKind + "', ");
+        if (!Strings.isNullOrEmpty(timeExtractorFormatterPattern)) {
+            ddl.append(
+                    " '"
+                            + 
PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key()
+                            + "' = '"
+                            + timeExtractorFormatterPattern
+                            + "', ");
+        }
+        ddl.append(
+                " '"
+                        + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key()
+                        + "' = '"
+                        + timeExtractorPattern
+                        + "', ");
+        ddl.append(" '" + SINK_PARTITION_COMMIT_TRIGGER.key() + "' = '" + 
commitTrigger + "', ");
+        ddl.append(" '" + SINK_PARTITION_COMMIT_DELAY.key() + "' = '" + 
commitDelay + "', ");
+        ddl.append(" '" + SINK_PARTITION_COMMIT_POLICY_KIND.key() + "' = '" + 
policy + "', ");
+        ddl.append(
+                " '"
+                        + SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key()
+                        + "' = '"
+                        + successFileName
+                        + "', ");
+        ddl.append(" 'format'='parquet', ");
+        ddl.append(" 'parquet." + UTC_TIMEZONE.key() + "' = '" + timezone + 
"', ");
+        ddl.append(" 'parquet." + TIMESTAMP_TIME_UNIT.key() + "' = '" + 
timeunit + "', ");
+        ddl.append(" 'parquet." + WRITE_INT64_TIMESTAMP.key() + "' = '" + 
useInt64 + "'");
+        ddl.append(") ");
+        return ddl.toString();
+    }
+
+    @Override
+    public void check(String sqlQuery, Seq<Row> expectedResult) {
+        List<Row> result =
+                
CollectionUtil.iteratorToList(tEnv().sqlQuery(sqlQuery).execute().collect());
+        assertEquals(
+                JavaScalaConversionUtil.toJava(expectedResult).stream()
+                        .map(row -> TestSinkUtil.rowToString(row, 
TimeZone.getTimeZone("UTC")))
+                        .sorted()
+                        .collect(Collectors.toList()),
+                result.stream()
+                        .map(
+                                row -> {
+                                    row.setField(
+                                            2, 
Timestamp.valueOf((LocalDateTime) row.getField(2)));
+                                    return TestSinkUtil.rowToString(
+                                            row, TimeZone.getTimeZone("UTC"));
+                                })
+                        .sorted()
+                        .collect(Collectors.toList()));
+    }
+}
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index 38edb278ad8..ae2d0079ddf 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -68,6 +68,9 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.IDENTIFIER;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT;
+import static 
org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}. */
@@ -127,6 +130,17 @@ class ParquetRowDataWriterTest {
         complexTypeTest(folder, conf, false);
     }
 
+    @Test
+    public void testInt64Timestamp(@TempDir java.nio.file.Path folder) throws 
Exception {
+        Configuration conf = new Configuration();
+        conf.set(IDENTIFIER + "." + WRITE_INT64_TIMESTAMP.key(), "true");
+        conf.set(IDENTIFIER + "." + TIMESTAMP_TIME_UNIT.key(), "nanos");
+        innerTest(folder, conf, true);
+        innerTest(folder, conf, false);
+        complexTypeTest(folder, conf, true);
+        complexTypeTest(folder, conf, false);
+    }
+
     private void innerTest(java.nio.file.Path folder, Configuration conf, 
boolean utcTimestamp)
             throws IOException {
         Path path = new Path(folder.toString(), UUID.randomUUID().toString());
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java
new file mode 100644
index 00000000000..96a0c5104a0
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.vector;
+
+import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link TimestampColumnReader}. */
+public class ParquetInt64TimestampReaderTest {
+    @Test
+    public void testReadInt64TimestampMicros() {
+        LocalDateTime localDateTime = LocalDateTime.of(2021, 11, 22, 17, 50, 
20, 112233);
+        long time =
+                localDateTime.toEpochSecond(OffsetDateTime.now().getOffset()) 
* 1_000_000
+                        + localDateTime.getNano() / 1_000;
+        TimestampData timestampData =
+                TimestampColumnReader.int64ToTimestamp(
+                        false, time, LogicalTypeAnnotation.TimeUnit.MICROS);
+        assertEquals("2021-11-22T17:50:20.000112", timestampData.toString());
+    }
+
+    @Test
+    public void testReadInt64TimestampMillis() {
+        LocalDateTime localDateTime = LocalDateTime.of(2021, 11, 22, 17, 50, 
20, 112233);
+        long time =
+                localDateTime.toEpochSecond(OffsetDateTime.now().getOffset()) 
* 1_000
+                        + localDateTime.getNano() / 1_000_000;
+        TimestampData timestampData =
+                TimestampColumnReader.int64ToTimestamp(
+                        false, time, LogicalTypeAnnotation.TimeUnit.MILLIS);
+        assertEquals("2021-11-22T17:50:20", timestampData.toString());
+    }
+
+    @Test
+    public void testReadInt64TimestampNanos() {
+        LocalDateTime localDateTime = LocalDateTime.of(2021, 11, 22, 17, 50, 
20, 112233);
+        long time =
+                localDateTime.toEpochSecond(OffsetDateTime.now().getOffset()) 
* 1_000_000_000
+                        + localDateTime.getNano();
+        TimestampData timestampData =
+                TimestampColumnReader.int64ToTimestamp(
+                        false, time, LogicalTypeAnnotation.TimeUnit.NANOS);
+        assertEquals("2021-11-22T17:50:20.000112233", 
timestampData.toString());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
index b75bab81467..0c06f538e3d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala
@@ -52,7 +52,7 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
   protected var resultPath: String = _
 
   // iso date
-  private val data: Seq[Row] = Seq(
+  def getData: Seq[Row] = Seq(
     Row.of(Integer.valueOf(1), "a", "b", "05-03-2020", "07"),
     Row.of(Integer.valueOf(2), "p", "q", "05-03-2020", "08"),
     Row.of(Integer.valueOf(3), "x", "y", "05-03-2020", "09"),
@@ -61,7 +61,7 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
   )
 
   // basic iso date
-  private val data2 = Seq(
+  def getData2 = Seq(
     Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"),
     Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"),
     Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"),
@@ -105,6 +105,13 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
     testPartitionCustomFormatDate(partition = true, "metastore")
   }
 
+  def getDataStream2(fun: Row => Long) = {
+    new DataStream(
+      env.getJavaEnv.addSource(
+        new FiniteTestSource(getData2, fun),
+        new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
+  }
+
   @Test
   def testPartitionWithBasicDate(): Unit = {
 
@@ -116,13 +123,8 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
       TimestampData.fromLocalDateTime(localDateTime).getMillisecond
     }
 
-    val stream: DataStream[Row] = new DataStream(
-      env.getJavaEnv.addSource(
-        new FiniteTestSource(data2, fun),
-        new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
-
     // write out the data
-    test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d", 
data2)
+    test(getDataStream2(fun), "default", "yyyyMMdd", "$d", "d", 
"partition-time", "1d", getData2)
 
     // verify that the written data is correct
     val basePath = new File(new URI(resultPath).getPath)
@@ -131,6 +133,13 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
     Assert.assertTrue(new File(new File(basePath, "d=20200504"), 
"_MY_SUCCESS").exists())
   }
 
+  def getDataStream(fun: Row => Long): DataStream[Row] = {
+    new DataStream(
+      env.getJavaEnv.addSource(
+        new FiniteTestSource(getData, fun),
+        new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
+  }
+
   def testPartitionCustomFormatDate(partition: Boolean, policy: String = 
"success-file"): Unit = {
 
     val fun = (t: Row) => {
@@ -140,20 +149,15 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
       TimestampData.fromLocalDateTime(localDateTime).getMillisecond
     }
 
-    val stream = new DataStream(
-      env.getJavaEnv.addSource(
-        new FiniteTestSource(data, fun),
-        new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING)))
-
     test(
-      stream,
+      getDataStream(fun),
       "default",
       "MM-dd-yyyy HH:mm:ss",
       "$d $e:00:00",
       if (partition) "d,e" else "",
       "process-time",
       "1h",
-      data,
+      getData,
       policy)
   }
 
@@ -173,6 +177,31 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
 
     tEnv.createTemporaryView("my_table", dataStream, $("a"), $("b"), $("c"), 
$("d"), $("e"))
 
+    val ddl: String = getDDL(
+      timeExtractorKind,
+      timeExtractorFormatterPattern,
+      timeExtractorPattern,
+      partition,
+      commitTrigger,
+      commitDelay,
+      policy,
+      successFileName)
+    tEnv.executeSql(ddl)
+
+    tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
+
+    check("select * from sink_table", dataTest)
+  }
+
+  def getDDL(
+      timeExtractorKind: String,
+      timeExtractorFormatterPattern: String,
+      timeExtractorPattern: String,
+      partition: String,
+      commitTrigger: String,
+      commitDelay: String,
+      policy: String,
+      successFileName: String) = {
     val ddl =
       s"""
          |create table sink_table (
@@ -202,11 +231,7 @@ abstract class FsStreamingSinkITCaseBase extends 
StreamingTestBase {
          |  ${additionalProperties().mkString(",\n")}
          |)
        """.stripMargin
-    tEnv.executeSql(ddl)
-
-    tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await()
-
-    check("select * from sink_table", dataTest)
+    ddl
   }
 
   def check(sqlQuery: String, expectedResult: Seq[Row]): Unit = {

Reply via email to