This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 1b7569d0a7 DRILL-8492: Read Parquet Microsecond Columns as Bigint
(#2907)
1b7569d0a7 is described below
commit 1b7569d0a7ddc804ffd697fe83708c03cddfc958
Author: Peter Franzen <[email protected]>
AuthorDate: Wed Jul 31 08:15:49 2024 +0200
DRILL-8492: Read Parquet Microsecond Columns as Bigint (#2907)
---
.../java/org/apache/drill/exec/ExecConstants.java | 6 +
.../exec/server/options/SystemOptionManager.java | 2 +
.../exec/store/parquet/ParquetReaderConfig.java | 26 +-
.../parquet/columnreaders/ColumnReaderFactory.java | 35 ++-
.../columnreaders/ParquetToDrillTypeConverter.java | 13 +-
.../parquet/metadata/FileMetadataCollector.java | 13 +-
.../store/parquet2/DrillParquetGroupConverter.java | 20 +-
.../java-exec/src/main/resources/drill-module.conf | 2 +
.../parquet/ParquetSimpleTestFileGenerator.java | 41 ++-
.../exec/store/parquet/TestMicrosecondColumns.java | 289 +++++++++++++++++----
.../test/resources/parquet/microseconds.parquet | Bin 871 -> 820 bytes
.../parquet/microseconds_small_diff.parquet | Bin 0 -> 848 bytes
12 files changed, 373 insertions(+), 74 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c13d7a32b8..77da909203 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -415,6 +415,12 @@ public final class ExecConstants {
public static final String PARQUET_READER_INT96_AS_TIMESTAMP =
"store.parquet.reader.int96_as_timestamp";
public static final OptionValidator
PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new
BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP,
new OptionDescription("Enables Drill to implicitly interpret the INT96
timestamp data type in Parquet files."));
+ public static final String PARQUET_READER_TIME_MICROS_AS_INT64 =
"store.parquet.reader.time_micros_as_int64";
+ public static final OptionValidator
PARQUET_READER_TIME_MICROS_AS_INT64_VALIDATOR = new
BooleanValidator(PARQUET_READER_TIME_MICROS_AS_INT64,
+ new OptionDescription("Enables Drill to implicitly interpret the
TIME_MICROS data type in Parquet files as 64-bit integers instead of SQL times
truncated to milliseconds."));
+ public static final String PARQUET_READER_TIMESTAMP_MICROS_AS_INT64 =
"store.parquet.reader.timestamp_micros_as_int64";
+ public static final OptionValidator
PARQUET_READER_TIMESTAMP_MICROS_AS_INT64_VALIDATOR = new
BooleanValidator(PARQUET_READER_TIMESTAMP_MICROS_AS_INT64,
+ new OptionDescription("Enables Drill to implicitly interpret the
TIMESTAMP_MICROS data type in Parquet files as 64-bit integers instead of SQL
timestamps truncated to milliseconds."));
public static final String PARQUET_READER_STRINGS_SIGNED_MIN_MAX =
"store.parquet.reader.strings_signed_min_max";
public static final StringValidator
PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR = new
EnumeratedStringValidator(PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 25e1651a76..5e98a4decf 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -183,6 +183,8 @@ public class SystemOptionManager extends BaseOptionManager
implements AutoClosea
new
OptionDefinition(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
+ new
OptionDefinition(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64_VALIDATOR),
+ new
OptionDefinition(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
index c7a3db59d3..da840910e9 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
@@ -51,6 +51,8 @@ public class ParquetReaderConfig {
private boolean enableTimeReadCounter = false;
private boolean autoCorrectCorruptedDates = true;
private boolean enableStringsSignedMinMax = false;
+ private boolean readTimeMicrosAsInt64 = false;
+ private boolean readTimestampMicrosAsInt64 = false;
public static ParquetReaderConfig.Builder builder() {
return new ParquetReaderConfig.Builder();
@@ -100,6 +102,16 @@ public class ParquetReaderConfig {
return enableStringsSignedMinMax;
}
+ @JsonProperty("readTimeMicrosAsInt64")
+ public boolean readTimeMicrosAsInt64() {
+ return readTimeMicrosAsInt64;
+ }
+
+ @JsonProperty("readTimestampMicrosAsInt64")
+ public boolean readTimestampMicrosAsInt64() {
+ return readTimestampMicrosAsInt64;
+ }
+
public ParquetReadOptions toReadOptions() {
return ParquetReadOptions.builder()
.useSignedStringMinMax(enableStringsSignedMinMax)
@@ -120,7 +132,9 @@ public class ParquetReaderConfig {
enableBytesTotalCounter,
enableTimeReadCounter,
autoCorrectCorruptedDates,
- enableStringsSignedMinMax);
+ enableStringsSignedMinMax,
+ readTimeMicrosAsInt64,
+ readTimestampMicrosAsInt64);
}
@Override
@@ -136,7 +150,9 @@ public class ParquetReaderConfig {
&& enableBytesTotalCounter == that.enableBytesTotalCounter
&& enableTimeReadCounter == that.enableTimeReadCounter
&& autoCorrectCorruptedDates == that.autoCorrectCorruptedDates
- && enableStringsSignedMinMax == that.enableStringsSignedMinMax;
+ && enableStringsSignedMinMax == that.enableStringsSignedMinMax
+ && readTimeMicrosAsInt64 == that.readTimeMicrosAsInt64
+ && readTimestampMicrosAsInt64 == that.readTimestampMicrosAsInt64;
}
@Override
@@ -147,6 +163,8 @@ public class ParquetReaderConfig {
+ ", enableTimeReadCounter=" + enableTimeReadCounter
+ ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates
+ ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+ + ", readTimeMicrosAsInt64=" + readTimeMicrosAsInt64
+ + ", readTimestampMicrosAsInt64=" + readTimestampMicrosAsInt64
+ '}';
}
@@ -195,6 +213,10 @@ public class ParquetReaderConfig {
if (optVal != null && !optVal.isEmpty()) {
readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optVal);
}
+
+ // The read*MicrosAsInt64 config values are set from any option scope.
+ readerConfig.readTimeMicrosAsInt64 =
options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64);
+ readerConfig.readTimestampMicrosAsInt64 =
options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64);
}
return readerConfig;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 077ad92d01..d22c07d7bc 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -180,11 +180,21 @@ public class ColumnReaderFactory {
return new
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
descriptor,
columnChunkMetaData, fixedLength, (BigIntVector) v,
schemaElement);
case TIMESTAMP_MICROS:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampMicrosReader(recordReader,
descriptor,
- columnChunkMetaData, fixedLength, (TimeStampVector) v,
schemaElement);
+ if
(recordReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64))
{
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (BigIntVector) v,
schemaElement);
+ } else {
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampMicrosReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (TimeStampVector) v,
schemaElement);
+ }
case TIME_MICROS:
- return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeMicrosReader(recordReader,
descriptor,
- columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
+ if
(recordReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64))
{
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (BigIntVector) v,
schemaElement);
+ } else {
+ return new
ParquetFixedWidthDictionaryReaders.DictionaryTimeMicrosReader(recordReader,
descriptor,
+ columnChunkMetaData, fixedLength, (TimeVector) v,
schemaElement);
+ }
case UINT_64:
return new
ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader,
descriptor,
columnChunkMetaData, fixedLength, (UInt8Vector) v,
schemaElement);
@@ -299,10 +309,21 @@ public class ColumnReaderFactory {
case TIMESTAMP_MILLIS:
return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableTimeStampVector)valueVec, schemaElement);
case TIME_MICROS:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeMicrosReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector)
valueVec, schemaElement);
+ if
(parentReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64))
{
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableBigIntVector) valueVec, schemaElement);
+ } else {
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeMicrosReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableTimeVector) valueVec, schemaElement);
+ }
case TIMESTAMP_MICROS:
- return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampMicrosReader(parentReader,
- columnDescriptor, columnChunkMetaData, fixedLength,
(NullableTimeStampVector) valueVec, schemaElement);
+ if
(parentReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64))
{
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableBigIntVector) valueVec, schemaElement);
+ } else {
+ return new
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampMicrosReader(parentReader,
+ columnDescriptor, columnChunkMetaData, fixedLength,
(NullableTimeStampVector) valueVec, schemaElement);
+ }
case INT_64:
return new
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
columnDescriptor, columnChunkMetaData, fixedLength,
(NullableBigIntVector) valueVec, schemaElement);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index 3ad906b4ec..008bedf808 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -62,10 +62,19 @@ public class ParquetToDrillTypeConverter {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return TypeProtos.MinorType.VARDECIMAL;
case TIMESTAMP_MILLIS:
- case TIMESTAMP_MICROS:
return TypeProtos.MinorType.TIMESTAMP;
+ case TIMESTAMP_MICROS:
+ if
(options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
+ return TypeProtos.MinorType.BIGINT;
+ } else {
+ return TypeProtos.MinorType.TIMESTAMP;
+ }
case TIME_MICROS:
- return TypeProtos.MinorType.TIME;
+ if
(options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
+ return TypeProtos.MinorType.BIGINT;
+ } else {
+ return TypeProtos.MinorType.TIME;
+ }
default:
throw new UnsupportedOperationException(String.format("unsupported
type: %s %s", primitiveTypeName, convertedType));
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
index 9950171f97..4ad039902d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
@@ -58,6 +58,8 @@ public class FileMetadataCollector {
private final FileSystem fs;
private final boolean allColumnsInteresting;
private final boolean skipNonInteresting;
+ private final boolean truncateTimeMicros;
+ private final boolean truncateTimestampMicros;
private final Set<SchemaPath> columnSet;
private final MessageType schema;
@@ -90,6 +92,9 @@ public class FileMetadataCollector {
readerConfig.autoCorrectCorruptedDates());
logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
+ this.truncateTimeMicros = !readerConfig.readTimeMicrosAsInt64();
+ this.truncateTimestampMicros = !readerConfig.readTimestampMicrosAsInt64();
+
this.colTypeInfoMap = new HashMap<>();
for (String[] path : schema.getPaths()) {
colTypeInfoMap.put(SchemaPath.getCompoundPath(path),
ColTypeInfo.of(schema, schema, path, 0, new ArrayList<>()));
@@ -208,7 +213,7 @@ public class FileMetadataCollector {
minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)
minValue);
maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)
maxValue);
}
- if (isMicrosecondColumnType(columnTypeMetadata.originalType)) {
+ if (shouldTruncateMicros(columnTypeMetadata)) {
// DRILL-8241: truncate the min/max of microsecond columns to
milliseconds, otherwise the
// initial scanning of files when filtering will compare to the
wrong values.
minValue = truncateMicros(minValue);
@@ -224,8 +229,10 @@ public class FileMetadataCollector {
columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata);
}
- private static boolean isMicrosecondColumnType(OriginalType columnType) {
- return columnType == OriginalType.TIME_MICROS || columnType ==
OriginalType.TIMESTAMP_MICROS;
+ private boolean shouldTruncateMicros(Metadata_V4.ColumnTypeMetadata_v4
columnTypeMetadata) {
+ return (truncateTimeMicros && columnTypeMetadata.originalType ==
OriginalType.TIME_MICROS)
+ ||
+ (truncateTimestampMicros && columnTypeMetadata.originalType ==
OriginalType.TIMESTAMP_MICROS);
}
private static Object truncateMicros(Object microSeconds) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 55bd189152..d854055151 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -260,14 +260,22 @@ public class DrillParquetGroupConverter extends
GroupConverter {
case INT_64:
return getBigIntConverter(name, type);
case TIMESTAMP_MICROS: {
- TimeStampWriter writer = getTimeStampWriter(name, type);
- return new DrillTimeStampMicrosConverter(writer);
+ if
(options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
+ return getBigIntConverter(name, type);
+ } else {
+ TimeStampWriter writer = getTimeStampWriter(name, type);
+ return new DrillTimeStampMicrosConverter(writer);
+ }
}
case TIME_MICROS: {
- TimeWriter writer = type.isRepetition(Repetition.REPEATED)
- ? getWriter(name, (m, f) -> m.list(f).time(), l ->
l.list().time())
- : getWriter(name, MapWriter::time, ListWriter::time);
- return new DrillTimeMicrosConverter(writer);
+ if
(options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
+ return getBigIntConverter(name, type);
+ } else {
+ TimeWriter writer = type.isRepetition(Repetition.REPEATED)
+ ? getWriter(name, (m, f) -> m.list(f).time(), l ->
l.list().time())
+ : getWriter(name, MapWriter::time, ListWriter::time);
+ return new DrillTimeMicrosConverter(writer);
+ }
}
case DECIMAL: {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index cdbf03ab7d..7541a99e2d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -747,6 +747,8 @@ drill.exec.options: {
store.parquet.reader.columnreader.async: false,
store.parquet.reader.enable_map_support: true,
store.parquet.reader.int96_as_timestamp: false,
+ store.parquet.reader.time_micros_as_int64: false,
+ store.parquet.reader.timestamp_micros_as_int64: false,
store.parquet.reader.pagereader.async: true,
store.parquet.reader.pagereader.bufferedread: true,
store.parquet.reader.pagereader.buffersize: 1048576,
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index efd1b4fd17..c185c9820d 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -211,12 +211,19 @@ public class ParquetSimpleTestFileGenerator {
" required int32 rowKey; \n" +
" repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" +
"} \n";
+ public static final String microsecondColumnsSchemaMsg =
+ "message ParquetMicrosecondDataTypes { \n" +
+ " required int32 rowKey; \n" +
+ " required int64 _TIME_MICROS_int64 ( TIME_MICROS ) ; \n" +
+ " required int64 _TIMESTAMP_MICROS_int64 ( TIMESTAMP_MICROS ) ;
\n" +
+ "} \n";
public static MessageType simpleSchema =
MessageTypeParser.parseMessageType(simpleSchemaMsg);
public static MessageType complexSchema =
MessageTypeParser.parseMessageType(complexSchemaMsg);
public static MessageType simpleNullableSchema =
MessageTypeParser.parseMessageType(simpleNullableSchemaMsg);
public static MessageType complexNullableSchema =
MessageTypeParser.parseMessageType(complexNullableSchemaMsg);
public static MessageType repeatedIntSchema =
MessageTypeParser.parseMessageType(repeatedIntSchemaMsg);
+ public static MessageType microsecondColumnsSchema =
MessageTypeParser.parseMessageType(microsecondColumnsSchemaMsg);
public static Path initFile(String fileName) {
@@ -488,6 +495,24 @@ public class ParquetSimpleTestFileGenerator {
}
}
+ public static void writeMicrosecondValues(
+ SimpleGroupFactory groupFactory,
+ ParquetWriter<Group> writer,
+ long[] timeMicrosValues,
+ long[] timestampMicrosValues) throws IOException {
+
+ int numValues = Math.min(timeMicrosValues.length,
timestampMicrosValues.length);
+ for (int i = 0; i < numValues; i++) {
+
+ writer.write(
+ groupFactory.newGroup()
+ .append("rowKey", i + 1)
+ .append("_TIME_MICROS_int64", timeMicrosValues[i])
+ .append("_TIMESTAMP_MICROS_int64", timestampMicrosValues[i])
+ );
+ }
+ }
+
public static void main(String[] args) throws IOException {
SimpleGroupFactory sgf = new SimpleGroupFactory(simpleSchema);
@@ -495,6 +520,7 @@ public class ParquetSimpleTestFileGenerator {
SimpleGroupFactory sngf = new SimpleGroupFactory(simpleNullableSchema);
GroupFactory ngf = new SimpleGroupFactory(complexNullableSchema);
SimpleGroupFactory repeatedIntGroupFactory = new
SimpleGroupFactory(repeatedIntSchema);
+ SimpleGroupFactory microsecondGroupFactory = new
SimpleGroupFactory(microsecondColumnsSchema);
// Generate files with dictionary encoding enabled and disabled
ParquetWriter<Group> simpleWriter = initWriter(simpleSchema,
"drill/parquet_test_file_simple", true);
@@ -506,6 +532,8 @@ public class ParquetSimpleTestFileGenerator {
ParquetWriter<Group> simpleNullableNoDictWriter =
initWriter(simpleNullableSchema,
"drill/parquet_test_file_simple_nullable_nodict", false);
ParquetWriter<Group> complexNullableNoDictWriter =
initWriter(complexNullableSchema,
"drill/parquet_test_file_complex_nullable_nodict", false);
ParquetWriter<Group> repeatedIntV2Writer = initWriter(repeatedIntSchema,
"drill/parquet_v2_repeated_int.parquet",
ParquetProperties.WriterVersion.PARQUET_2_0, true);
+ ParquetWriter<Group> microsecondWriter =
initWriter(microsecondColumnsSchema, "drill/microseconds.parquet", false);
+ ParquetWriter<Group> microsecondSmallDiffWriter =
initWriter(microsecondColumnsSchema, "drill/microseconds_small_diff.parquet",
false);
ParquetSimpleTestFileGenerator.writeSimpleValues(sgf, simpleWriter, false);
ParquetSimpleTestFileGenerator.writeSimpleValues(sngf,
simpleNullableWriter, true);
@@ -516,6 +544,16 @@ public class ParquetSimpleTestFileGenerator {
ParquetSimpleTestFileGenerator.writeComplexValues(gf, complexNoDictWriter,
false);
ParquetSimpleTestFileGenerator.writeComplexValues(ngf,
complexNullableNoDictWriter, true);
ParquetSimpleTestFileGenerator.writeRepeatedIntValues(repeatedIntGroupFactory,
repeatedIntV2Writer, 100);
+ ParquetSimpleTestFileGenerator.writeMicrosecondValues(
+ microsecondGroupFactory,
+ microsecondWriter,
+ TestMicrosecondColumns.TIME_MICROS_VALUES,
+ TestMicrosecondColumns.TIMESTAMP_MICROS_VALUES);
+ ParquetSimpleTestFileGenerator.writeMicrosecondValues(
+ microsecondGroupFactory,
+ microsecondSmallDiffWriter,
+ TestMicrosecondColumns.TIME_MICROS_SMALL_DIFF_VALUES,
+ TestMicrosecondColumns.TIMESTAMP_MICROS_SMALL_DIFF_VALUES);
simpleWriter.close();
complexWriter.close();
@@ -526,7 +564,8 @@ public class ParquetSimpleTestFileGenerator {
simpleNullableNoDictWriter.close();
complexNullableNoDictWriter.close();
repeatedIntV2Writer.close();
-
+ microsecondWriter.close();
+ microsecondSmallDiffWriter.close();
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
index 01832b446e..a81cdbf370 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
@@ -17,29 +17,22 @@
*/
package org.apache.drill.exec.store.parquet;
-import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.TestBuilder;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
-
+import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -55,33 +48,45 @@ public class TestMicrosecondColumns extends ClusterTest {
private static final String TO_TIMESTAMP_TEMPLATE = "TO_TIMESTAMP('%s',
'yyy-MM-dd''T''HH:mm:ss.SSS')";
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT);
- // The parquet file used in the test cases, can be generated by calling
createParquetTestFile().
+ // The parquet files used in the test cases, created by
ParquetSimpleTestFileGenerator.
private static final String DATAFILE = "cp.`parquet/microseconds.parquet`";
+ private static final String DATAFILE_SMALL_DIFF =
"cp.`parquet/microseconds_small_diff.parquet`";
- // Schema used to generate the parquet test file.
- private static final String SCHEMA =
- "message ParquetMicrosecondDataTypes { \n" +
- " required int32 rowKey; \n" +
- " required int64 _TIME_MICROS_int64 ( TIME_MICROS ) ; \n" +
- " required int64 _TIMESTAMP_MICROS_int64 ( TIMESTAMP_MICROS ) ;
\n" +
- "} \n";
-
- // Test values for the _TIME_MICROS_int64 field. Will be written to the test
parquet file when
- // calling createParquetTestFile().
- private static final long[] TIME_MICROS_VALUES = {
+ // Test values for the _TIME_MICROS_int64 field. Will be written to the test
parquet file by
+ // ParquetSimpleTestFileGenerator.
+ static final long[] TIME_MICROS_VALUES = {
toMicrosecondTime(0, 32, 58, 174711),
toMicrosecondTime(9, 0, 22, 654321),
toMicrosecondTime(22, 12, 41, 123456)
};
- // Test values for the _TIMESTAMP_MICROS_int64 field. Will be written to the
test parquet file
- // when calling createParquetTestFile().
- private static final long[] TIMESTAMP_MICROS_VALUES = {
+ // Test values for the _TIMESTAMP_MICROS_int64 field. Will be written to the
test parquet file by
+ // ParquetSimpleTestFileGenerator.
+ static final long[] TIMESTAMP_MICROS_VALUES = {
toMicrosecondTimestamp(2021, 8, 1, 22, 12, 41, 123456),
toMicrosecondTimestamp(2022, 5, 6, 9, 0, 22, 654321),
toMicrosecondTimestamp(2023, 2, 10, 0, 32, 58, 174711)
};
+ // Test values with small differences (less than a millisecond) for the
_TIME_MICROS_int64 field.
+ // Used for testing ORDER BY. Written to the test parquet file by
ParquetSimpleTestFileGenerator.
+ static final long[] TIME_MICROS_SMALL_DIFF_VALUES = {
+ toMicrosecondTime(10, 11, 12, 336804),
+ toMicrosecondTime(10, 11, 12, 336587),
+ toMicrosecondTime(10, 11, 12, 336172),
+ toMicrosecondTime(10, 11, 12, 336991),
+ toMicrosecondTime(10, 11, 12, 336336)
+ };
+
+ // Test values with small differences (less than a millisecond) for the
_TIMESTAMP_MICROS_int64
+ // field. Used for testing ORDER BY. Written to the test parquet file by
ParquetSimpleTestFileGenerator.
+ static final long[] TIMESTAMP_MICROS_SMALL_DIFF_VALUES = {
+ toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182665),
+ toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182429),
+ toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182707),
+ toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182003),
+ toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182860)
+ };
@BeforeClass
public static void setUp() throws Exception {
@@ -89,6 +94,13 @@ public class TestMicrosecondColumns extends ClusterTest {
}
+ @After
+ public void restoreSession() {
+ client.alterSession(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64,
false);
+
client.alterSession(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64,
false);
+ }
+
+
@Test
public void testSelectTimeColumns() throws Exception {
// DRILL-8423
@@ -170,6 +182,100 @@ public class TestMicrosecondColumns extends ClusterTest {
}
+ @Test
+ public void testSelectTimeColumnAsBigInt() throws Exception {
+ String query = "select _TIME_MICROS_int64 as t from %s";
+ testBuilder()
+ .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("t")
+ .baselineValues(TIME_MICROS_VALUES[0])
+ .baselineValues(TIME_MICROS_VALUES[1])
+ .baselineValues(TIME_MICROS_VALUES[2])
+ .go();
+ }
+
+
+ @Test
+ public void testSelectStarTimeColumnAsBigInt() throws Exception {
+ // PARQUET_READER_TIME_MICROS_AS_INT64 should only affect time_micros
columns, not
+ // timestamp_micros columns.
+ String query = "select * from %s";
+ testBuilder()
+ .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+
.baselineColumns("rowKey","_TIME_MICROS_int64","_TIMESTAMP_MICROS_int64")
+ .baselineValues(1, TIME_MICROS_VALUES[0],
toLocalDateTime(TIMESTAMP_MICROS_VALUES[0]))
+ .baselineValues(2, TIME_MICROS_VALUES[1],
toLocalDateTime(TIMESTAMP_MICROS_VALUES[1]))
+ .baselineValues(3, TIME_MICROS_VALUES[2],
toLocalDateTime(TIMESTAMP_MICROS_VALUES[2]))
+ .go();
+ }
+
+
+ @Test
+ public void testSelectTimeColumnAsBigIntWithBigIntFilter() throws Exception {
+ String query = "select _TIME_MICROS_int64 as t from %s where
_TIME_MICROS_int64 > " + TIME_MICROS_VALUES[0];
+ testBuilder()
+ .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("t")
+ .baselineValues(TIME_MICROS_VALUES[1])
+ .baselineValues(TIME_MICROS_VALUES[2])
+ .go();
+ }
+
+
+ @Test
+ public void testSelectTimeColumnAsBigIntWithTimeFilter() throws Exception {
+ String query = "select _TIME_MICROS_int64 as t from %s where
TO_TIME(_TIME_MICROS_int64/1000) > " +
createToTimeFragment(TIME_MICROS_VALUES[0]);
+ testBuilder()
+ .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("t")
+ .baselineValues(TIME_MICROS_VALUES[1])
+ .baselineValues(TIME_MICROS_VALUES[2])
+ .go();
+ }
+
+
+ @Test
+ public void testOrderByTimeColumnAsBigInt() throws Exception {
+ long[] sortedValues = Arrays.copyOf(TIME_MICROS_SMALL_DIFF_VALUES,
TIME_MICROS_SMALL_DIFF_VALUES.length);
+ Arrays.sort(sortedValues);
+ String query = "select _TIME_MICROS_int64 as t from %s ORDER BY t";
+ TestBuilder builder = testBuilder()
+ .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE_SMALL_DIFF)
+ .ordered()
+ .baselineColumns("t");
+ for (long expectedValue : sortedValues) {
+ builder.baselineValues(expectedValue);
+ }
+ builder.go();
+ }
+
+
+ @Test
+ public void testOrderByDescTimeColumnAsBigInt() throws Exception {
+ long[] sortedValues = Arrays.copyOf(TIME_MICROS_SMALL_DIFF_VALUES,
TIME_MICROS_SMALL_DIFF_VALUES.length);
+ Arrays.sort(sortedValues);
+ String query = "select _TIME_MICROS_int64 as t from %s ORDER BY t DESC";
+ TestBuilder builder = testBuilder()
+ .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE_SMALL_DIFF)
+ .ordered()
+ .baselineColumns("t");
+ for (int i=sortedValues.length-1; i>= 0; i--) {
+ builder.baselineValues(sortedValues[i]);
+ }
+ builder.go();
+ }
+
+
@Test
public void testSelectTimestampColumns() throws Exception {
String query = "select _TIMESTAMP_MICROS_int64 as t from %s";
@@ -270,44 +376,111 @@ public class TestMicrosecondColumns extends ClusterTest {
}
- private void executeFilterQuery(String whereClause, long expectedCount)
throws Exception {
- String query = "select count(*) as c from %s where " + whereClause;
+ @Test
+ public void testSelectTimestampColumnAsBigInt() throws Exception {
+ String query = "select _TIMESTAMP_MICROS_int64 as t from %s";
testBuilder()
+
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
.sqlQuery(query, DATAFILE)
.unOrdered()
- .baselineColumns("c")
- .baselineValues(expectedCount)
+ .baselineColumns("t")
+ .baselineValues(TIMESTAMP_MICROS_VALUES[0])
+ .baselineValues(TIMESTAMP_MICROS_VALUES[1])
+ .baselineValues(TIMESTAMP_MICROS_VALUES[2])
+ .go();
+ }
+
+
+ @Test
+ public void testSelectStarTimestampColumnAsBigInt() throws Exception {
+ // PARQUET_READER_TIMESTAMP_MICROS_AS_INT64 should only affect
timestamp_micros columns, not
+ // time_micros columns.
+ String query = "select * from %s";
+ testBuilder()
+
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+
.baselineColumns("rowKey","_TIME_MICROS_int64","_TIMESTAMP_MICROS_int64")
+ .baselineValues(1, toLocalTime(TIME_MICROS_VALUES[0]),
TIMESTAMP_MICROS_VALUES[0])
+ .baselineValues(2, toLocalTime(TIME_MICROS_VALUES[1]),
TIMESTAMP_MICROS_VALUES[1])
+ .baselineValues(3, toLocalTime(TIME_MICROS_VALUES[2]),
TIMESTAMP_MICROS_VALUES[2])
+ .go();
+ }
+
+
+ @Test
+ public void testSelectTimestampColumnAsBigIntWithBigIntFilter() throws
Exception {
+ String query = "select _TIMESTAMP_MICROS_int64 as t from %s where
_TIMESTAMP_MICROS_int64 > " + TIMESTAMP_MICROS_VALUES[0];
+ testBuilder()
+
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("t")
+ .baselineValues(TIMESTAMP_MICROS_VALUES[1])
+ .baselineValues(TIMESTAMP_MICROS_VALUES[2])
+ .go();
+ }
+
+
+ @Test
+ public void testSelectTimestampColumnAsBigIntWithTimestampFilter() throws
Exception {
+ // TO_TIMESTAMP(double) creates a timestamp in the system default
timezone, must compare to
+ // a TO_TIMESTAMP(string ,format) in the same timezone.
+ String toTimestampTerm =
createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0], ZoneId.systemDefault());
+ String query = "select _TIMESTAMP_MICROS_int64 as t from %s where
TO_TIMESTAMP(_TIMESTAMP_MICROS_int64/1000000) > " + toTimestampTerm;
+ testBuilder()
+
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("t")
+ .baselineValues(TIMESTAMP_MICROS_VALUES[1])
+ .baselineValues(TIMESTAMP_MICROS_VALUES[2])
.go();
}
- public static void createParquetTestFile(String filePath) throws IOException
{
+ @Test
+ public void testOrderByTimestampColumnAsBigInt() throws Exception {
+ long[] sortedValues = Arrays.copyOf(TIMESTAMP_MICROS_SMALL_DIFF_VALUES,
TIMESTAMP_MICROS_SMALL_DIFF_VALUES.length);
+ Arrays.sort(sortedValues);
+ String query = "select _TIMESTAMP_MICROS_int64 as t from %s ORDER BY t";
+ TestBuilder builder = testBuilder()
+
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE_SMALL_DIFF)
+ .ordered()
+ .baselineColumns("t");
+ for (long expectedValue : sortedValues) {
+ builder.baselineValues(expectedValue);
+ }
+ builder.go();
+ }
- MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA);
- GroupWriteSupport.setSchema(messageType,
ParquetSimpleTestFileGenerator.conf);
- SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);
- try (ParquetWriter<Group> writer = createParquetWriter(filePath)) {
- for (int i=0; i<TIME_MICROS_VALUES.length; i++) {
- writer.write(
- groupFactory.newGroup()
- .append("rowKey", i+1)
- .append("_TIME_MICROS_int64", TIME_MICROS_VALUES[i])
- .append("_TIMESTAMP_MICROS_int64", TIMESTAMP_MICROS_VALUES[i])
- );
- }
+ @Test
+ public void testOrderByDescTimestampColumnAsBigInt() throws Exception {
+ long[] sortedValues = Arrays.copyOf(TIMESTAMP_MICROS_SMALL_DIFF_VALUES,
TIMESTAMP_MICROS_SMALL_DIFF_VALUES.length);
+ Arrays.sort(sortedValues);
+ String query = "select _TIMESTAMP_MICROS_int64 as t from %s ORDER BY t
DESC";
+ TestBuilder builder = testBuilder()
+
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+ .sqlQuery(query, DATAFILE_SMALL_DIFF)
+ .ordered()
+ .baselineColumns("t");
+ for (int i=sortedValues.length-1; i>= 0; i--) {
+ builder.baselineValues(sortedValues[i]);
}
+ builder.go();
}
- private static ParquetWriter<Group> createParquetWriter(String filePath)
throws IOException {
- return
-
ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath))
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .withCompressionCodec(CompressionCodecName.GZIP)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
- .withConf(ParquetSimpleTestFileGenerator.conf)
- .build();
+ private void executeFilterQuery(String whereClause, long expectedCount)
throws Exception {
+ String query = "select count(*) as c from %s where " + whereClause;
+ testBuilder()
+ .sqlQuery(query, DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(expectedCount)
+ .go();
}
@@ -321,13 +494,23 @@ public class TestMicrosecondColumns extends ClusterTest {
}
+ private static String createToTimestampFragment(long micros, ZoneId
timeZone) {
+ return String.format(TO_TIMESTAMP_TEMPLATE,
TIMESTAMP_FORMATTER.format(toLocalDateTime(micros, timeZone)));
+ }
+
+
private static LocalTime toLocalTime(long micros) {
return LocalTime.ofNanoOfDay((micros/1000L) * 1000_000L);
}
private static LocalDateTime toLocalDateTime(long micros) {
- return LocalDateTime.ofInstant(Instant.ofEpochMilli(micros/1000L),
ZoneOffset.ofHours(0));
+ return toLocalDateTime(micros, ZoneOffset.ofHours(0));
+ }
+
+
+ private static LocalDateTime toLocalDateTime(long micros, ZoneId timeZone) {
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(micros/1000L),
timeZone);
}
diff --git a/exec/java-exec/src/test/resources/parquet/microseconds.parquet
b/exec/java-exec/src/test/resources/parquet/microseconds.parquet
index 7bac7aa26d..3faef5335f 100644
Binary files a/exec/java-exec/src/test/resources/parquet/microseconds.parquet
and b/exec/java-exec/src/test/resources/parquet/microseconds.parquet differ
diff --git
a/exec/java-exec/src/test/resources/parquet/microseconds_small_diff.parquet
b/exec/java-exec/src/test/resources/parquet/microseconds_small_diff.parquet
new file mode 100644
index 0000000000..089ae70a29
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/microseconds_small_diff.parquet
differ