This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new 4379f2c39fb [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) (#23887) 4379f2c39fb is described below commit 4379f2c39fbdaeb78f874f6dd461d20b8a8961cd Author: Thomas Weise <twe...@users.noreply.github.com> AuthorDate: Fri Dec 8 13:49:04 2023 -0500 [FLINK-25565][Formats][Parquet] write and read parquet int64 timestamp (#18304) (#23887) Co-authored-by: Bo Cui <cuibo0...@163.com> --- .../docs/connectors/table/formats/parquet.md | 14 ++ .../docs/connectors/table/formats/parquet.md | 14 ++ .../formats/parquet/ParquetFileFormatFactory.java | 15 ++ .../parquet/ParquetVectorizedInputFormat.java | 13 +- .../formats/parquet/row/ParquetRowDataBuilder.java | 13 +- .../formats/parquet/row/ParquetRowDataWriter.java | 67 ++++- .../parquet/utils/ParquetSchemaConverter.java | 50 +++- .../formats/parquet/vector/ParquetDictionary.java | 20 +- .../parquet/vector/ParquetSplitReaderUtil.java | 3 +- .../vector/reader/AbstractColumnReader.java | 2 +- .../vector/reader/TimestampColumnReader.java | 101 +++++++- .../formats/parquet/ParquetTimestampITCase.java | 280 +++++++++++++++++++++ .../parquet/row/ParquetRowDataWriterTest.java | 14 ++ .../vector/ParquetInt64TimestampReaderTest.java | 69 +++++ .../runtime/stream/FsStreamingSinkITCaseBase.scala | 65 +++-- 15 files changed, 683 insertions(+), 57 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/parquet.md b/docs/content.zh/docs/connectors/table/formats/parquet.md index 92536d00f98..295b11ef41b 100644 --- a/docs/content.zh/docs/connectors/table/formats/parquet.md +++ b/docs/content.zh/docs/connectors/table/formats/parquet.md @@ -84,6 +84,20 @@ Format 参数 <td>Boolean</td> <td>使用 UTC 时区或本地时区在纪元时间和 LocalDateTime 之间进行转换。Hive 0.x/1.x/2.x 使用本地时区,但 Hive 3.x 使用 UTC 时区。</td> </tr> + <tr> + <td><h5>timestamp.time.unit</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">micros</td> + <td>String</td> + <td>根据TimeUnit在Timestamp和int64之间进行转换,可选值nanos/micros/millis。</td> + </tr> + <tr> + <td><h5>write.int64.timestamp</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>以int64替代int96存储parquet Timestamp。 注意:Timestamp将于时区无关(从不转换为不同时区)。</td> + </tr> </tbody> </table> diff --git a/docs/content/docs/connectors/table/formats/parquet.md b/docs/content/docs/connectors/table/formats/parquet.md index 0b7ba9c42f5..75c524f238f 100644 --- a/docs/content/docs/connectors/table/formats/parquet.md +++ b/docs/content/docs/connectors/table/formats/parquet.md @@ -84,6 +84,20 @@ Format Options <td>Boolean</td> <td>Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone.</td> </tr> + <tr> + <td><h5>timestamp.time.unit</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">micros</td> + <td>String</td> + <td>Store parquet int64/LogicalTypes timestamps in this time unit, value is nanos/micros/millis.</td> + </tr> + <tr> + <td><h5>write.int64.timestamp</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Write parquet timestamp as int64/LogicalTypes instead of int96/OriginalTypes. Note: Timestamp will be time zone agnostic (NEVER converted to a different time zone).</td> + </tr> </tbody> </table> diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java index 14f257899c1..8be727c7947 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileFormatFactory.java @@ -68,6 +68,21 @@ public class ParquetFileFormatFactory implements BulkReaderFormatFactory, BulkWr + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x" + " use UTC timezone"); + public static final ConfigOption<String> TIMESTAMP_TIME_UNIT = + key("timestamp.time.unit") + .stringType() + .defaultValue("micros") + .withDescription( + "Store parquet int64/LogicalTypes timestamps in this time unit, value is nanos/micros/millis"); + + public static final ConfigOption<Boolean> WRITE_INT64_TIMESTAMP = + key("write.int64.timestamp") + .booleanType() + .defaultValue(false) + .withDescription( + "Write parquet timestamp as int64/LogicalTypes instead of int96/OriginalTypes. " + + "Note: Timestamp will be time zone agnostic (NEVER converted to a different time zone)."); + @Override public BulkDecodingFormat<RowData> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java index 914df658de1..d675dc359d8 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java @@ -130,7 +130,8 @@ public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceS MessageType fileSchema = parquetFileReader.getFooter().getFileMetaData().getSchema(); // Pruning unnecessary column, we should set the projection schema before running any // filtering (e.g. getting filtered record count) because projection impacts filtering - MessageType requestedSchema = clipParquetSchema(fileSchema, unknownFieldsIndices); + MessageType requestedSchema = + clipParquetSchema(fileSchema, unknownFieldsIndices, hadoopConfig.conf()); parquetFileReader.setRequestedSchema(requestedSchema); checkSchema(fileSchema, requestedSchema); @@ -173,7 +174,9 @@ public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceS /** Clips `parquetSchema` according to `fieldNames`. */ private MessageType clipParquetSchema( - GroupType parquetSchema, Collection<Integer> unknownFieldsIndices) { + GroupType parquetSchema, + Collection<Integer> unknownFieldsIndices, + org.apache.hadoop.conf.Configuration config) { Type[] types = new Type[projectedFields.length]; if (isCaseSensitive) { for (int i = 0; i < projectedFields.length; ++i) { @@ -185,7 +188,7 @@ public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceS parquetSchema); types[i] = ParquetSchemaConverter.convertToParquetType( - fieldName, projectedTypes[i]); + fieldName, projectedTypes[i], config); unknownFieldsIndices.add(i); } else { types[i] = parquetSchema.getType(fieldName); @@ -215,7 +218,9 @@ public abstract class ParquetVectorizedInputFormat<T, SplitT extends FileSourceS parquetSchema); type = ParquetSchemaConverter.convertToParquetType( - projectedFields[i].toLowerCase(Locale.ROOT), projectedTypes[i]); + projectedFields[i].toLowerCase(Locale.ROOT), + projectedTypes[i], + config); unknownFieldsIndices.add(i); } // TODO clip for array,map,row types. diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java index 30dc8dbc236..5abb21d5a26 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java @@ -65,13 +65,19 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, Parque @Override protected WriteSupport<RowData> getWriteSupport(Configuration conf) { - return new ParquetWriteSupport(); + return new ParquetWriteSupport(conf); } private class ParquetWriteSupport extends WriteSupport<RowData> { - private MessageType schema = convertToParquetMessageType("flink_schema", rowType); + private MessageType schema = null; private ParquetRowDataWriter writer; + private Configuration conf; + + private ParquetWriteSupport(Configuration conf) { + this.conf = conf; + schema = convertToParquetMessageType("flink_schema", rowType, conf); + } @Override public WriteContext init(Configuration configuration) { @@ -80,7 +86,8 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, Parque @Override public void prepareForWrite(RecordConsumer recordConsumer) { - this.writer = new ParquetRowDataWriter(recordConsumer, rowType, schema, utcTimestamp); + this.writer = + new ParquetRowDataWriter(recordConsumer, rowType, schema, utcTimestamp, conf); } @Override diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java index 4a521373e34..37b65b2b58b 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java @@ -35,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.GroupType; @@ -47,9 +48,14 @@ import java.sql.Timestamp; import java.util.Arrays; import java.util.List; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.IDENTIFIER; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP; import static org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.computeMinBytesForDecimalPrecision; import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS; +import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MICROS_PER_MILLISECOND; import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY; +import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MICROSECONDS; import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_MILLISECOND; import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.NANOS_PER_SECOND; @@ -60,14 +66,34 @@ public class ParquetRowDataWriter { private final RecordConsumer recordConsumer; private final boolean utcTimestamp; + private final Configuration conf; + private boolean useInt64 = false; + private LogicalTypeAnnotation.TimeUnit timeUnit; + public ParquetRowDataWriter( RecordConsumer recordConsumer, RowType rowType, GroupType schema, - boolean utcTimestamp) { + boolean utcTimestamp, + Configuration conf) { this.recordConsumer = recordConsumer; this.utcTimestamp = utcTimestamp; - + this.conf = conf; + if (this.conf != null) { + useInt64 = + this.conf.getBoolean( + IDENTIFIER + "." + WRITE_INT64_TIMESTAMP.key(), + WRITE_INT64_TIMESTAMP.defaultValue()); + if (useInt64) { + timeUnit = + LogicalTypeAnnotation.TimeUnit.valueOf( + this.conf + .get( + IDENTIFIER + "." + TIMESTAMP_TIME_UNIT.key(), + TIMESTAMP_TIME_UNIT.defaultValue()) + .toUpperCase()); + } + } rowWriter = new RowWriter(rowType, schema); } @@ -326,7 +352,7 @@ public class ParquetRowDataWriter { } private void writeTimestamp(TimestampData value) { - recordConsumer.addBinary(timestampToInt96(value)); + ParquetRowDataWriter.this.writeTimestamp(recordConsumer, value); } } @@ -477,6 +503,41 @@ public class ParquetRowDataWriter { public void write(ArrayData arrayData, int ordinal) {} } + private void writeTimestamp(RecordConsumer recordConsumer, TimestampData timestampData) { + if (useInt64) { + recordConsumer.addLong(timestampToInt64(timestampData)); + } else { + recordConsumer.addBinary(timestampToInt96(timestampData)); + } + } + + private Long convertInt64ToLong(long mills, long nanosOfMillisecond) { + switch (timeUnit) { + case NANOS: + return mills * NANOS_PER_MILLISECOND + nanosOfMillisecond; + case MICROS: + return mills * MICROS_PER_MILLISECOND + nanosOfMillisecond / NANOS_PER_MICROSECONDS; + case MILLIS: + return mills; + default: + throw new IllegalArgumentException("Time unit not recognized"); + } + } + + private Long timestampToInt64(TimestampData timestampData) { + long mills = 0L; + long nanosOfMillisecond = 0L; + if (utcTimestamp) { + mills = timestampData.getMillisecond(); + nanosOfMillisecond = timestampData.getNanoOfMillisecond(); + } else { + Timestamp timestamp = timestampData.toTimestamp(); + mills = timestamp.getTime(); + nanosOfMillisecond = timestamp.getNanos() % NANOS_PER_MILLISECOND; + } + return convertInt64ToLong(mills, nanosOfMillisecond); + } + private Binary timestampToInt96(TimestampData timestampData) { int julianDay; long nanosOfDay; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java index 377803aa170..b7c91ee62ff 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java @@ -26,8 +26,10 @@ import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -37,26 +39,33 @@ import org.apache.parquet.schema.Types; import java.util.ArrayList; import java.util.List; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.IDENTIFIER; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP; + /** Schema converter converts Parquet schema to and from Flink internal types. */ public class ParquetSchemaConverter { static final String MAP_REPEATED_NAME = "key_value"; static final String LIST_ELEMENT_NAME = "element"; - public static MessageType convertToParquetMessageType(String name, RowType rowType) { + public static MessageType convertToParquetMessageType( + String name, RowType rowType, Configuration conf) { Type[] types = new Type[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); i++) { - types[i] = convertToParquetType(rowType.getFieldNames().get(i), rowType.getTypeAt(i)); + types[i] = + convertToParquetType( + rowType.getFieldNames().get(i), rowType.getTypeAt(i), conf); } return new MessageType(name, types); } - public static Type convertToParquetType(String name, LogicalType type) { - return convertToParquetType(name, type, Type.Repetition.OPTIONAL); + public static Type convertToParquetType(String name, LogicalType type, Configuration conf) { + return convertToParquetType(name, type, Type.Repetition.OPTIONAL, conf); } private static Type convertToParquetType( - String name, LogicalType type, Type.Repetition repetition) { + String name, LogicalType type, Type.Repetition repetition, Configuration conf) { switch (type.getTypeRoot()) { case CHAR: case VARCHAR: @@ -111,6 +120,19 @@ public class ParquetSchemaConverter { .named(name); case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (conf.getBoolean( + IDENTIFIER + "." + WRITE_INT64_TIMESTAMP.key(), + WRITE_INT64_TIMESTAMP.defaultValue())) { + LogicalTypeAnnotation.TimeUnit timeUnit = + LogicalTypeAnnotation.TimeUnit.valueOf( + conf.get( + IDENTIFIER + "." + TIMESTAMP_TIME_UNIT.key(), + TIMESTAMP_TIME_UNIT.defaultValue()) + .toUpperCase()); + return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(false, timeUnit)) + .named(name); + } return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition) .named(name); case ARRAY: @@ -118,35 +140,37 @@ public class ParquetSchemaConverter { return ConversionPatterns.listOfElements( repetition, name, - convertToParquetType(LIST_ELEMENT_NAME, arrayType.getElementType())); + convertToParquetType(LIST_ELEMENT_NAME, arrayType.getElementType(), conf)); case MAP: MapType mapType = (MapType) type; return ConversionPatterns.mapType( repetition, name, MAP_REPEATED_NAME, - convertToParquetType("key", mapType.getKeyType()), - convertToParquetType("value", mapType.getValueType())); + convertToParquetType("key", mapType.getKeyType(), conf), + convertToParquetType("value", mapType.getValueType(), conf)); case MULTISET: MultisetType multisetType = (MultisetType) type; return ConversionPatterns.mapType( repetition, name, MAP_REPEATED_NAME, - convertToParquetType("key", multisetType.getElementType()), - convertToParquetType("value", new IntType(false))); + convertToParquetType("key", multisetType.getElementType(), conf), + convertToParquetType("value", new IntType(false), conf)); case ROW: RowType rowType = (RowType) type; - return new GroupType(repetition, name, convertToParquetTypes(rowType)); + return new GroupType(repetition, name, convertToParquetTypes(rowType, conf)); default: throw new UnsupportedOperationException("Unsupported type: " + type); } } - private static List<Type> convertToParquetTypes(RowType rowType) { + private static List<Type> convertToParquetTypes(RowType rowType, Configuration conf) { List<Type> types = new ArrayList<>(rowType.getFieldCount()); for (int i = 0; i < rowType.getFieldCount(); i++) { - types.add(convertToParquetType(rowType.getFieldNames().get(i), rowType.getTypeAt(i))); + types.add( + convertToParquetType( + rowType.getFieldNames().get(i), rowType.getTypeAt(i), conf)); } return types; } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java index 2a098110aa5..f36c7ff4bfa 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java @@ -21,15 +21,23 @@ package org.apache.flink.formats.parquet.vector; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.columnar.vector.Dictionary; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; + +import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.decodeInt64ToTimestamp; import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.decodeInt96ToTimestamp; /** Parquet dictionary. */ public final class ParquetDictionary implements Dictionary { private org.apache.parquet.column.Dictionary dictionary; + private final ColumnDescriptor descriptor; - public ParquetDictionary(org.apache.parquet.column.Dictionary dictionary) { + public ParquetDictionary( + org.apache.parquet.column.Dictionary dictionary, ColumnDescriptor descriptor) { this.dictionary = dictionary; + this.descriptor = descriptor; } @Override @@ -59,6 +67,16 @@ public final class ParquetDictionary implements Dictionary { @Override public TimestampData decodeToTimestamp(int id) { + if (descriptor.getPrimitiveType().getPrimitiveTypeName() + == PrimitiveType.PrimitiveTypeName.INT64) { + return decodeInt64ToTimestamp( + true, + dictionary, + id, + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) + descriptor.getPrimitiveType().getLogicalTypeAnnotation()) + .getUnit()); + } return decodeInt96ToTimestamp(true, dictionary, id); } } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java index 15ee191d3f7..77fa87c7e26 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java @@ -483,7 +483,8 @@ public class ParquetSplitReaderUtil { case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: checkArgument( - typeName == PrimitiveType.PrimitiveTypeName.INT96, + typeName == PrimitiveType.PrimitiveTypeName.INT96 + || typeName == PrimitiveType.PrimitiveTypeName.INT64, "Unexpected type: %s", typeName); return new HeapTimestampVector(batchSize); diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java index 8f4382af6b4..c57a5afc6d1 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java @@ -173,7 +173,7 @@ public abstract class AbstractColumnReader<VECTOR extends WritableColumnVector> // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. // some // non-dictionary encoded values have already been added). - vector.setDictionary(new ParquetDictionary(dictionary)); + vector.setDictionary(new ParquetDictionary(dictionary, descriptor)); } else { readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds); } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java index 154ae13e18b..aa544f4e91c 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java @@ -25,6 +25,7 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -34,9 +35,9 @@ import java.sql.Timestamp; import java.util.concurrent.TimeUnit; /** - * Timestamp {@link ColumnReader}. We only support INT96 bytes now, julianDay(4) + nanosOfDay(8). - * See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp - * TIMESTAMP_MILLIS and TIMESTAMP_MICROS are the deprecated ConvertedType. + * Timestamp {@link ColumnReader}. We support INT96 and INT64 now, julianDay(4) + nanosOfDay(8). See + * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp TIMESTAMP_MILLIS + * and TIMESTAMP_MICROS are the deprecated ConvertedType. */ public class TimestampColumnReader extends AbstractColumnReader<WritableTimestampVector> { @@ -44,15 +45,40 @@ public class TimestampColumnReader extends AbstractColumnReader<WritableTimestam public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); public static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1); + public static final long MICROS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toMicros(1); + public static final long NANOS_PER_MICROSECONDS = TimeUnit.MICROSECONDS.toNanos(1); + public static final long MILLIS_PER_SECOND = TimeUnit.SECONDS.toMillis(1); + public static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1); private final boolean utcTimestamp; + private final PrimitiveType.PrimitiveTypeName actualName; + private final LogicalTypeAnnotation.TimeUnit timeUnit; public TimestampColumnReader( boolean utcTimestamp, ColumnDescriptor descriptor, PageReader pageReader) throws IOException { super(descriptor, pageReader); this.utcTimestamp = utcTimestamp; - checkTypeName(PrimitiveType.PrimitiveTypeName.INT96); + actualName = descriptor.getPrimitiveType().getPrimitiveTypeName(); + checkTypeName(); + if (actualName == PrimitiveType.PrimitiveTypeName.INT64) { + timeUnit = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) + descriptor.getPrimitiveType().getLogicalTypeAnnotation()) + .getUnit(); + } else { + timeUnit = null; + } + } + + private void checkTypeName() { + Preconditions.checkArgument( + actualName == PrimitiveType.PrimitiveTypeName.INT96 + || actualName == PrimitiveType.PrimitiveTypeName.INT64, + "Expected type name: %s or %s, actual type name: %s", + PrimitiveType.PrimitiveTypeName.INT64, + actualName == PrimitiveType.PrimitiveTypeName.INT96, + actualName); } @Override @@ -64,10 +90,16 @@ public class TimestampColumnReader extends AbstractColumnReader<WritableTimestam protected void readBatch(int rowId, int num, WritableTimestampVector column) { for (int i = 0; i < num; i++) { if (runLenDecoder.readInteger() == maxDefLevel) { - ByteBuffer buffer = readDataBuffer(12); - column.setTimestamp( - rowId + i, - int96ToTimestamp(utcTimestamp, buffer.getLong(), buffer.getInt())); + if (actualName == PrimitiveType.PrimitiveTypeName.INT64) { + ByteBuffer buffer = readDataBuffer(8); + column.setTimestamp( + rowId + i, int64ToTimestamp(utcTimestamp, buffer.getLong(), timeUnit)); + } else { + ByteBuffer buffer = readDataBuffer(12); + column.setTimestamp( + rowId + i, + int96ToTimestamp(utcTimestamp, buffer.getLong(), buffer.getInt())); + } } else { column.setNullAt(rowId + i); } @@ -79,13 +111,29 @@ public class TimestampColumnReader extends AbstractColumnReader<WritableTimestam int rowId, int num, WritableTimestampVector column, WritableIntVector dictionaryIds) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - column.setTimestamp( - i, - decodeInt96ToTimestamp(utcTimestamp, dictionary, dictionaryIds.getInt(i))); + if (actualName == PrimitiveType.PrimitiveTypeName.INT64) { + column.setTimestamp( + i, + decodeInt64ToTimestamp( + utcTimestamp, dictionary, dictionaryIds.getInt(i), timeUnit)); + } else { + column.setTimestamp( + i, + decodeInt96ToTimestamp( + utcTimestamp, dictionary, dictionaryIds.getInt(i))); + } } } } + public static TimestampData decodeInt64ToTimestamp( + boolean utcTimestamp, + org.apache.parquet.column.Dictionary dictionary, + int id, + LogicalTypeAnnotation.TimeUnit timeUnit) { + return int64ToTimestamp(utcTimestamp, dictionary.decodeToLong(id), timeUnit); + } + public static TimestampData decodeInt96ToTimestamp( boolean utcTimestamp, org.apache.parquet.column.Dictionary dictionary, int id) { Binary binary = dictionary.decodeToBinary(id); @@ -109,6 +157,37 @@ public class TimestampColumnReader extends AbstractColumnReader<WritableTimestam } } + public static TimestampData int64ToTimestamp( + boolean utcTimestamp, long value, LogicalTypeAnnotation.TimeUnit timeUnit) { + long nanosOfMillisecond = 0L; + long milliseconds = 0L; + + switch (timeUnit) { + case MILLIS: + milliseconds = value; + nanosOfMillisecond = value % MILLIS_PER_SECOND * NANOS_PER_MILLISECOND; + break; + case MICROS: + milliseconds = value / MICROS_PER_MILLISECOND; + nanosOfMillisecond = (value % MICROS_PER_SECOND) * NANOS_PER_MICROSECONDS; + break; + case NANOS: + milliseconds = value / NANOS_PER_MILLISECOND; + nanosOfMillisecond = value % NANOS_PER_SECOND; + break; + default: + break; + } + + if (utcTimestamp) { + return TimestampData.fromEpochMillis( + milliseconds, (int) (nanosOfMillisecond % NANOS_PER_MILLISECOND)); + } + Timestamp timestamp = new Timestamp(milliseconds); + timestamp.setNanos((int) nanosOfMillisecond); + return TimestampData.fromTimestamp(timestamp); + } + private static long julianDayToMillis(int julianDay) { return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; } diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java new file mode 100644 index 00000000000..ed267d329d0 --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTimestampITCase.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.scala.DataStream; +import org.apache.flink.table.planner.runtime.stream.FiniteTestSource; +import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase; +import org.apache.flink.table.planner.runtime.utils.TestSinkUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.apache.parquet.Strings; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import scala.Function1; +import scala.collection.Seq; + +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_KIND; +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER; +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN; +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_DELAY; +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND; +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME; +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_TRIGGER; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.UTC_TIMEZONE; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP; +import static org.junit.Assert.assertEquals; + +/** Test int64 timestamp. */ +@ExtendWith(ParameterizedTestExtension.class) +public class ParquetTimestampITCase extends FsStreamingSinkITCaseBase { + @Parameter public static boolean useInt64; + + @Parameter public static String timeunit; + + @Parameter public static boolean timezone; + + @Parameters(name = "useInt64 = {0}, timeunit = {1}, timezone = {2}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {false, "millis", false}, + new Object[] {true, "millis", false}, + new Object[] {true, "micros", false}, + new Object[] {true, "nanos", false}, + new Object[] {true, "millis", true}, + new Object[] {true, "micros", true}, + new Object[] {true, "nanos", true} + }; + } + + @Override + public Seq<Row> getData() { + return JavaScalaConversionUtil.toScala( + new ArrayList<Row>() { + { + add( + Row.of( + Integer.valueOf(1), + "a", + Timestamp.valueOf("2020-05-03 07:00:00.000000000"), + "05-03-2020", + "07")); + add( + Row.of( + Integer.valueOf(2), + "p", + Timestamp.valueOf("2020-05-03 08:01:01.111111111"), + "05-03-2020", + "08")); + add( + Row.of( + Integer.valueOf(3), + "x", + Timestamp.valueOf("2020-05-03 09:02:02.222222222"), + "05-03-2020", + "09")); + add( + Row.of( + Integer.valueOf(4), + "x", + Timestamp.valueOf("2020-05-03 10:03:03.333333333"), + "05-03-2020", + "10")); + add( + Row.of( + Integer.valueOf(5), + "x", + Timestamp.valueOf("2020-05-03 11:04:04.444444444"), + "05-03-2020", + "11")); + } + }); + } + + @Override + public Seq<Row> getData2() { + return JavaScalaConversionUtil.toScala( + new ArrayList<Row>() { + { + add( + Row.of( + Integer.valueOf(1), + "a", + Timestamp.valueOf("2020-05-03 07:00:00.000000000"), + "20200503", + "07")); + add( + Row.of( + Integer.valueOf(2), + "p", + Timestamp.valueOf("2020-05-03 08:01:01.111111111"), + "20200503", + "08")); + add( + Row.of( + Integer.valueOf(3), + "x", + Timestamp.valueOf("2020-05-03 09:02:02.222222222"), + "20200503", + "09")); + add( + Row.of( + Integer.valueOf(4), + "x", + Timestamp.valueOf("2020-05-04 10:03:03.333333333"), + "20200504", + "10")); + add( + Row.of( + Integer.valueOf(5), + "x", + Timestamp.valueOf("2020-05-04 11:04:04.444444444"), + "20200504", + "11")); + } + }); + } + + @Override + public DataStream<Row> getDataStream2(Function1<Row, Object> fun) { + return new DataStream<Row>( + env().getJavaEnv() + .addSource( + new FiniteTestSource(getData2(), fun), + new RowTypeInfo( + new TypeInformation[] { + Types.INT, + Types.STRING, + Types.SQL_TIMESTAMP, + Types.STRING, + Types.STRING + }, + new String[] {"a", "b", "c", "d", "e"}))); + } + + @Override + public DataStream<Row> getDataStream(Function1<Row, Object> fun) { + return new DataStream<Row>( + env().getJavaEnv() + .addSource( + new FiniteTestSource(getData(), fun), + new RowTypeInfo( + new TypeInformation[] { + Types.INT, + Types.STRING, + Types.SQL_TIMESTAMP, + Types.STRING, + Types.STRING + }, + new String[] {"a", "b", "c", "d", "e"}))); + } + + @Override + public String getDDL( + String timeExtractorKind, + String timeExtractorFormatterPattern, + String timeExtractorPattern, + String partition, + String commitTrigger, + String commitDelay, + String policy, + String successFileName) { + StringBuffer ddl = new StringBuffer("create table sink_table ("); + ddl.append(" a int, "); + ddl.append(" b string, "); + ddl.append(" c timestamp(3), "); + ddl.append(" d string,"); + ddl.append(" e string"); + ddl.append(") "); + if (!Strings.isNullOrEmpty(partition)) { + ddl.append("partitioned by ( " + partition + " ) "); + } + ddl.append("with ( "); + ddl.append(" 'connector' = 'filesystem', "); + ddl.append(" 'path' = '" + resultPath() + "', "); + ddl.append( + " '" + PARTITION_TIME_EXTRACTOR_KIND.key() + "' = '" + timeExtractorKind + "', "); + if (!Strings.isNullOrEmpty(timeExtractorFormatterPattern)) { + ddl.append( + " '" + + PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key() + + "' = '" + + timeExtractorFormatterPattern + + "', "); + } + ddl.append( + " '" + + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + + "' = '" + + timeExtractorPattern + + "', "); + ddl.append(" '" + SINK_PARTITION_COMMIT_TRIGGER.key() + "' = '" + commitTrigger + "', "); + ddl.append(" '" + SINK_PARTITION_COMMIT_DELAY.key() + "' = '" + commitDelay + "', "); + ddl.append(" '" + SINK_PARTITION_COMMIT_POLICY_KIND.key() + "' = '" + policy + "', "); + ddl.append( + " '" + + SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME.key() + + "' = '" + + successFileName + + "', "); + ddl.append(" 'format'='parquet', "); + ddl.append(" 'parquet." + UTC_TIMEZONE.key() + "' = '" + timezone + "', "); + ddl.append(" 'parquet." + TIMESTAMP_TIME_UNIT.key() + "' = '" + timeunit + "', "); + ddl.append(" 'parquet." + WRITE_INT64_TIMESTAMP.key() + "' = '" + useInt64 + "'"); + ddl.append(") "); + return ddl.toString(); + } + + @Override + public void check(String sqlQuery, Seq<Row> expectedResult) { + List<Row> result = + CollectionUtil.iteratorToList(tEnv().sqlQuery(sqlQuery).execute().collect()); + assertEquals( + JavaScalaConversionUtil.toJava(expectedResult).stream() + .map(row -> TestSinkUtil.rowToString(row, TimeZone.getTimeZone("UTC"))) + .sorted() + .collect(Collectors.toList()), + result.stream() + .map( + row -> { + row.setField( + 2, Timestamp.valueOf((LocalDateTime) row.getField(2))); + return TestSinkUtil.rowToString( + row, TimeZone.getTimeZone("UTC")); + }) + .sorted() + .collect(Collectors.toList())); + } +} diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java index 38edb278ad8..ae2d0079ddf 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java @@ -68,6 +68,9 @@ import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.IDENTIFIER; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.TIMESTAMP_TIME_UNIT; +import static org.apache.flink.formats.parquet.ParquetFileFormatFactory.WRITE_INT64_TIMESTAMP; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link ParquetRowDataBuilder} and {@link ParquetRowDataWriter}. */ @@ -127,6 +130,17 @@ class ParquetRowDataWriterTest { complexTypeTest(folder, conf, false); } + @Test + public void testInt64Timestamp(@TempDir java.nio.file.Path folder) throws Exception { + Configuration conf = new Configuration(); + conf.set(IDENTIFIER + "." + WRITE_INT64_TIMESTAMP.key(), "true"); + conf.set(IDENTIFIER + "." + TIMESTAMP_TIME_UNIT.key(), "nanos"); + innerTest(folder, conf, true); + innerTest(folder, conf, false); + complexTypeTest(folder, conf, true); + complexTypeTest(folder, conf, false); + } + private void innerTest(java.nio.file.Path folder, Configuration conf, boolean utcTimestamp) throws IOException { Path path = new Path(folder.toString(), UUID.randomUUID().toString()); diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java new file mode 100644 index 00000000000..96a0c5104a0 --- /dev/null +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetInt64TimestampReaderTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.vector; + +import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader; +import org.apache.flink.table.data.TimestampData; + +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.OffsetDateTime; + +import static org.junit.Assert.assertEquals; + +/** Test for {@link TimestampColumnReader}. */ +public class ParquetInt64TimestampReaderTest { + @Test + public void testReadInt64TimestampMicros() { + LocalDateTime localDateTime = LocalDateTime.of(2021, 11, 22, 17, 50, 20, 112233); + long time = + localDateTime.toEpochSecond(OffsetDateTime.now().getOffset()) * 1_000_000 + + localDateTime.getNano() / 1_000; + TimestampData timestampData = + TimestampColumnReader.int64ToTimestamp( + false, time, LogicalTypeAnnotation.TimeUnit.MICROS); + assertEquals("2021-11-22T17:50:20.000112", timestampData.toString()); + } + + @Test + public void testReadInt64TimestampMillis() { + LocalDateTime localDateTime = LocalDateTime.of(2021, 11, 22, 17, 50, 20, 112233); + long time = + localDateTime.toEpochSecond(OffsetDateTime.now().getOffset()) * 1_000 + + localDateTime.getNano() / 1_000_000; + TimestampData timestampData = + TimestampColumnReader.int64ToTimestamp( + false, time, LogicalTypeAnnotation.TimeUnit.MILLIS); + assertEquals("2021-11-22T17:50:20", timestampData.toString()); + } + + @Test + public void testReadInt64TimestampNanos() { + LocalDateTime localDateTime = LocalDateTime.of(2021, 11, 22, 17, 50, 20, 112233); + long time = + localDateTime.toEpochSecond(OffsetDateTime.now().getOffset()) * 1_000_000_000 + + localDateTime.getNano(); + TimestampData timestampData = + TimestampColumnReader.int64ToTimestamp( + false, time, LogicalTypeAnnotation.TimeUnit.NANOS); + assertEquals("2021-11-22T17:50:20.000112233", timestampData.toString()); + } +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala index b75bab81467..0c06f538e3d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/FsStreamingSinkITCaseBase.scala @@ -52,7 +52,7 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { protected var resultPath: String = _ // iso date - private val data: Seq[Row] = Seq( + def getData: Seq[Row] = Seq( Row.of(Integer.valueOf(1), "a", "b", "05-03-2020", "07"), Row.of(Integer.valueOf(2), "p", "q", "05-03-2020", "08"), Row.of(Integer.valueOf(3), "x", "y", "05-03-2020", "09"), @@ -61,7 +61,7 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { ) // basic iso date - private val data2 = Seq( + def getData2 = Seq( Row.of(Integer.valueOf(1), "a", "b", "20200503", "07"), Row.of(Integer.valueOf(2), "p", "q", "20200503", "08"), Row.of(Integer.valueOf(3), "x", "y", "20200503", "09"), @@ -105,6 +105,13 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { testPartitionCustomFormatDate(partition = true, "metastore") } + def getDataStream2(fun: Row => Long) = { + new DataStream( + env.getJavaEnv.addSource( + new FiniteTestSource(getData2, fun), + new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING))) + } + @Test def testPartitionWithBasicDate(): Unit = { @@ -116,13 +123,8 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { TimestampData.fromLocalDateTime(localDateTime).getMillisecond } - val stream: DataStream[Row] = new DataStream( - env.getJavaEnv.addSource( - new FiniteTestSource(data2, fun), - new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING))) - // write out the data - test(stream, "default", "yyyyMMdd", "$d", "d", "partition-time", "1d", data2) + test(getDataStream2(fun), "default", "yyyyMMdd", "$d", "d", "partition-time", "1d", getData2) // verify that the written data is correct val basePath = new File(new URI(resultPath).getPath) @@ -131,6 +133,13 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { Assert.assertTrue(new File(new File(basePath, "d=20200504"), "_MY_SUCCESS").exists()) } + def getDataStream(fun: Row => Long): DataStream[Row] = { + new DataStream( + env.getJavaEnv.addSource( + new FiniteTestSource(getData, fun), + new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING))) + } + def testPartitionCustomFormatDate(partition: Boolean, policy: String = "success-file"): Unit = { val fun = (t: Row) => { @@ -140,20 +149,15 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { TimestampData.fromLocalDateTime(localDateTime).getMillisecond } - val stream = new DataStream( - env.getJavaEnv.addSource( - new FiniteTestSource(data, fun), - new RowTypeInfo(Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING))) - test( - stream, + getDataStream(fun), "default", "MM-dd-yyyy HH:mm:ss", "$d $e:00:00", if (partition) "d,e" else "", "process-time", "1h", - data, + getData, policy) } @@ -173,6 +177,31 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { tEnv.createTemporaryView("my_table", dataStream, $("a"), $("b"), $("c"), $("d"), $("e")) + val ddl: String = getDDL( + timeExtractorKind, + timeExtractorFormatterPattern, + timeExtractorPattern, + partition, + commitTrigger, + commitDelay, + policy, + successFileName) + tEnv.executeSql(ddl) + + tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await() + + check("select * from sink_table", dataTest) + } + + def getDDL( + timeExtractorKind: String, + timeExtractorFormatterPattern: String, + timeExtractorPattern: String, + partition: String, + commitTrigger: String, + commitDelay: String, + policy: String, + successFileName: String) = { val ddl = s""" |create table sink_table ( @@ -202,11 +231,7 @@ abstract class FsStreamingSinkITCaseBase extends StreamingTestBase { | ${additionalProperties().mkString(",\n")} |) """.stripMargin - tEnv.executeSql(ddl) - - tEnv.sqlQuery("select * from my_table").executeInsert("sink_table").await() - - check("select * from sink_table", dataTest) + ddl } def check(sqlQuery: String, expectedResult: Seq[Row]): Unit = {