This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b7569d0a7 DRILL-8492: Read Parquet Microsecond Columns as Bigint 
(#2907)
1b7569d0a7 is described below

commit 1b7569d0a7ddc804ffd697fe83708c03cddfc958
Author: Peter Franzen <[email protected]>
AuthorDate: Wed Jul 31 08:15:49 2024 +0200

    DRILL-8492: Read Parquet Microsecond Columns as Bigint (#2907)
---
 .../java/org/apache/drill/exec/ExecConstants.java  |   6 +
 .../exec/server/options/SystemOptionManager.java   |   2 +
 .../exec/store/parquet/ParquetReaderConfig.java    |  26 +-
 .../parquet/columnreaders/ColumnReaderFactory.java |  35 ++-
 .../columnreaders/ParquetToDrillTypeConverter.java |  13 +-
 .../parquet/metadata/FileMetadataCollector.java    |  13 +-
 .../store/parquet2/DrillParquetGroupConverter.java |  20 +-
 .../java-exec/src/main/resources/drill-module.conf |   2 +
 .../parquet/ParquetSimpleTestFileGenerator.java    |  41 ++-
 .../exec/store/parquet/TestMicrosecondColumns.java | 289 +++++++++++++++++----
 .../test/resources/parquet/microseconds.parquet    | Bin 871 -> 820 bytes
 .../parquet/microseconds_small_diff.parquet        | Bin 0 -> 848 bytes
 12 files changed, 373 insertions(+), 74 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c13d7a32b8..77da909203 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -415,6 +415,12 @@ public final class ExecConstants {
   public static final String PARQUET_READER_INT96_AS_TIMESTAMP = 
"store.parquet.reader.int96_as_timestamp";
   public static final OptionValidator 
PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new 
BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP,
       new OptionDescription("Enables Drill to implicitly interpret the INT96 
timestamp data type in Parquet files."));
+  public static final String PARQUET_READER_TIME_MICROS_AS_INT64 = 
"store.parquet.reader.time_micros_as_int64";
+  public static final OptionValidator 
PARQUET_READER_TIME_MICROS_AS_INT64_VALIDATOR = new 
BooleanValidator(PARQUET_READER_TIME_MICROS_AS_INT64,
+      new OptionDescription("Enables Drill to implicitly interpret the 
TIME_MICROS data type in Parquet files as 64-bit integers instead of SQL times 
truncated to milliseconds."));
+  public static final String PARQUET_READER_TIMESTAMP_MICROS_AS_INT64 = 
"store.parquet.reader.timestamp_micros_as_int64";
+  public static final OptionValidator 
PARQUET_READER_TIMESTAMP_MICROS_AS_INT64_VALIDATOR = new 
BooleanValidator(PARQUET_READER_TIMESTAMP_MICROS_AS_INT64,
+      new OptionDescription("Enables Drill to implicitly interpret the 
TIMESTAMP_MICROS data type in Parquet files as 64-bit integers instead of SQL 
timestamps truncated to milliseconds."));
 
   public static final String PARQUET_READER_STRINGS_SIGNED_MIN_MAX = 
"store.parquet.reader.strings_signed_min_max";
   public static final StringValidator 
PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR = new 
EnumeratedStringValidator(PARQUET_READER_STRINGS_SIGNED_MIN_MAX,
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 25e1651a76..5e98a4decf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -183,6 +183,8 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new 
OptionDefinition(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
+      new 
OptionDefinition(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64_VALIDATOR),
+      new 
OptionDefinition(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR),
       new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
index c7a3db59d3..da840910e9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java
@@ -51,6 +51,8 @@ public class ParquetReaderConfig {
   private boolean enableTimeReadCounter = false;
   private boolean autoCorrectCorruptedDates = true;
   private boolean enableStringsSignedMinMax = false;
+  private boolean readTimeMicrosAsInt64 = false;
+  private boolean readTimestampMicrosAsInt64 = false;
 
   public static ParquetReaderConfig.Builder builder() {
     return new ParquetReaderConfig.Builder();
@@ -100,6 +102,16 @@ public class ParquetReaderConfig {
     return enableStringsSignedMinMax;
   }
 
+  @JsonProperty("readTimeMicrosAsInt64")
+  public boolean readTimeMicrosAsInt64() {
+    return readTimeMicrosAsInt64;
+  }
+
+  @JsonProperty("readTimestampMicrosAsInt64")
+  public boolean readTimestampMicrosAsInt64() {
+    return readTimestampMicrosAsInt64;
+  }
+
   public ParquetReadOptions toReadOptions() {
     return ParquetReadOptions.builder()
       .useSignedStringMinMax(enableStringsSignedMinMax)
@@ -120,7 +132,9 @@ public class ParquetReaderConfig {
       enableBytesTotalCounter,
       enableTimeReadCounter,
       autoCorrectCorruptedDates,
-      enableStringsSignedMinMax);
+      enableStringsSignedMinMax,
+      readTimeMicrosAsInt64,
+      readTimestampMicrosAsInt64);
   }
 
   @Override
@@ -136,7 +150,9 @@ public class ParquetReaderConfig {
       && enableBytesTotalCounter == that.enableBytesTotalCounter
       && enableTimeReadCounter == that.enableTimeReadCounter
       && autoCorrectCorruptedDates == that.autoCorrectCorruptedDates
-      && enableStringsSignedMinMax == that.enableStringsSignedMinMax;
+      && enableStringsSignedMinMax == that.enableStringsSignedMinMax
+      && readTimeMicrosAsInt64 == that.readTimeMicrosAsInt64
+      && readTimestampMicrosAsInt64 == that.readTimestampMicrosAsInt64;
   }
 
   @Override
@@ -147,6 +163,8 @@ public class ParquetReaderConfig {
       + ", enableTimeReadCounter=" + enableTimeReadCounter
       + ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates
       + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+      + ", readTimeMicrosAsInt64=" + readTimeMicrosAsInt64
+      + ", readTimestampMicrosAsInt64=" + readTimestampMicrosAsInt64
       + '}';
   }
 
@@ -195,6 +213,10 @@ public class ParquetReaderConfig {
         if (optVal != null && !optVal.isEmpty()) {
           readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optVal);
         }
+
+        // The read*MicrosAsInt64 config values are set from any option scope.
+        readerConfig.readTimeMicrosAsInt64 = 
options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64);
+        readerConfig.readTimestampMicrosAsInt64 = 
options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64);
       }
 
       return readerConfig;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 077ad92d01..d22c07d7bc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -180,11 +180,21 @@ public class ColumnReaderFactory {
             return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
descriptor,
               columnChunkMetaData, fixedLength, (BigIntVector) v, 
schemaElement);
           case TIMESTAMP_MICROS:
-            return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampMicrosReader(recordReader,
 descriptor,
-                columnChunkMetaData, fixedLength, (TimeStampVector) v, 
schemaElement);
+            if 
(recordReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64))
 {
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
descriptor,
+                  columnChunkMetaData, fixedLength, (BigIntVector) v, 
schemaElement);
+            } else {
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeStampMicrosReader(recordReader,
 descriptor,
+                  columnChunkMetaData, fixedLength, (TimeStampVector) v, 
schemaElement);
+            }
           case TIME_MICROS:
-            return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeMicrosReader(recordReader, 
descriptor,
-              columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
+            if 
(recordReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64))
 {
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, 
descriptor,
+                  columnChunkMetaData, fixedLength, (BigIntVector) v, 
schemaElement);
+            } else {
+              return new 
ParquetFixedWidthDictionaryReaders.DictionaryTimeMicrosReader(recordReader, 
descriptor,
+                  columnChunkMetaData, fixedLength, (TimeVector) v, 
schemaElement);
+            }
           case UINT_64:
             return new 
ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, 
descriptor,
                 columnChunkMetaData, fixedLength, (UInt8Vector) v, 
schemaElement);
@@ -299,10 +309,21 @@ public class ColumnReaderFactory {
           case TIMESTAMP_MILLIS:
             return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampReader(parentReader, 
columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableTimeStampVector)valueVec, schemaElement);
           case TIME_MICROS:
-            return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeMicrosReader(parentReader,
 columnDescriptor, columnChunkMetaData, fixedLength, (NullableTimeVector) 
valueVec, schemaElement);
+            if 
(parentReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64))
 {
+              return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector) valueVec, schemaElement);
+            } else {
+              return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeMicrosReader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableTimeVector) valueVec, schemaElement);
+            }
           case TIMESTAMP_MICROS:
-            return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampMicrosReader(parentReader,
-              columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableTimeStampVector) valueVec, schemaElement);
+            if 
(parentReader.getFragmentContext().getOptions().getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64))
 {
+              return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector) valueVec, schemaElement);
+            } else {
+              return new 
NullableFixedByteAlignedReaders.NullableDictionaryTimeStampMicrosReader(parentReader,
+                columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableTimeStampVector) valueVec, schemaElement);
+            }
           case INT_64:
             return new 
NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader,
               columnDescriptor, columnChunkMetaData, fixedLength, 
(NullableBigIntVector) valueVec, schemaElement);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index 3ad906b4ec..008bedf808 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -62,10 +62,19 @@ public class ParquetToDrillTypeConverter {
             ParquetReaderUtility.checkDecimalTypeEnabled(options);
             return TypeProtos.MinorType.VARDECIMAL;
           case TIMESTAMP_MILLIS:
-          case TIMESTAMP_MICROS:
             return TypeProtos.MinorType.TIMESTAMP;
+          case TIMESTAMP_MICROS:
+            if 
(options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
+              return TypeProtos.MinorType.BIGINT;
+            } else {
+              return TypeProtos.MinorType.TIMESTAMP;
+            }
           case TIME_MICROS:
-            return TypeProtos.MinorType.TIME;
+            if 
(options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
+              return TypeProtos.MinorType.BIGINT;
+            } else {
+              return TypeProtos.MinorType.TIME;
+            }
           default:
             throw new UnsupportedOperationException(String.format("unsupported 
type: %s %s", primitiveTypeName, convertedType));
         }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
index 9950171f97..4ad039902d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
@@ -58,6 +58,8 @@ public class FileMetadataCollector {
   private final FileSystem fs;
   private final boolean allColumnsInteresting;
   private final boolean skipNonInteresting;
+  private final boolean truncateTimeMicros;
+  private final boolean truncateTimestampMicros;
   private final Set<SchemaPath> columnSet;
 
   private final MessageType schema;
@@ -90,6 +92,9 @@ public class FileMetadataCollector {
         readerConfig.autoCorrectCorruptedDates());
     logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
 
+    this.truncateTimeMicros = !readerConfig.readTimeMicrosAsInt64();
+    this.truncateTimestampMicros = !readerConfig.readTimestampMicrosAsInt64();
+
     this.colTypeInfoMap = new HashMap<>();
     for (String[] path : schema.getPaths()) {
       colTypeInfoMap.put(SchemaPath.getCompoundPath(path), 
ColTypeInfo.of(schema, schema, path, 0, new ArrayList<>()));
@@ -208,7 +213,7 @@ public class FileMetadataCollector {
           minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) 
minValue);
           maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) 
maxValue);
         }
-        if (isMicrosecondColumnType(columnTypeMetadata.originalType)) {
+        if (shouldTruncateMicros(columnTypeMetadata)) {
           // DRILL-8241: truncate the min/max of microsecond columns to 
milliseconds, otherwise the
           // initial scanning of files when filtering will compare to the 
wrong values.
           minValue = truncateMicros(minValue);
@@ -224,8 +229,10 @@ public class FileMetadataCollector {
     columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata);
   }
 
-  private static boolean isMicrosecondColumnType(OriginalType columnType) {
-    return columnType == OriginalType.TIME_MICROS || columnType == 
OriginalType.TIMESTAMP_MICROS;
+  private boolean shouldTruncateMicros(Metadata_V4.ColumnTypeMetadata_v4 
columnTypeMetadata) {
+    return (truncateTimeMicros && columnTypeMetadata.originalType == 
OriginalType.TIME_MICROS)
+           ||
+           (truncateTimestampMicros && columnTypeMetadata.originalType == 
OriginalType.TIMESTAMP_MICROS);
   }
 
   private static Object truncateMicros(Object microSeconds) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 55bd189152..d854055151 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -260,14 +260,22 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
           case INT_64:
             return getBigIntConverter(name, type);
           case TIMESTAMP_MICROS: {
-            TimeStampWriter writer = getTimeStampWriter(name, type);
-            return new DrillTimeStampMicrosConverter(writer);
+            if 
(options.getBoolean(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)) {
+              return getBigIntConverter(name, type);
+            } else {
+              TimeStampWriter writer = getTimeStampWriter(name, type);
+              return new DrillTimeStampMicrosConverter(writer);
+            }
           }
           case TIME_MICROS: {
-            TimeWriter writer = type.isRepetition(Repetition.REPEATED)
-              ? getWriter(name, (m, f) -> m.list(f).time(), l -> 
l.list().time())
-              : getWriter(name, MapWriter::time, ListWriter::time);
-            return new DrillTimeMicrosConverter(writer);
+            if 
(options.getBoolean(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)) {
+              return getBigIntConverter(name, type);
+            } else {
+              TimeWriter writer = type.isRepetition(Repetition.REPEATED)
+                  ? getWriter(name, (m, f) -> m.list(f).time(), l -> 
l.list().time())
+                  : getWriter(name, MapWriter::time, ListWriter::time);
+              return new DrillTimeMicrosConverter(writer);
+            }
           }
           case DECIMAL: {
             ParquetReaderUtility.checkDecimalTypeEnabled(options);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index cdbf03ab7d..7541a99e2d 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -747,6 +747,8 @@ drill.exec.options: {
     store.parquet.reader.columnreader.async: false,
     store.parquet.reader.enable_map_support: true,
     store.parquet.reader.int96_as_timestamp: false,
+    store.parquet.reader.time_micros_as_int64: false,
+    store.parquet.reader.timestamp_micros_as_int64: false,
     store.parquet.reader.pagereader.async: true,
     store.parquet.reader.pagereader.bufferedread: true,
     store.parquet.reader.pagereader.buffersize: 1048576,
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index efd1b4fd17..c185c9820d 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -211,12 +211,19 @@ public class ParquetSimpleTestFileGenerator {
           "  required int32 rowKey; \n" +
           "  repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" +
           "} \n";
+  public static final String microsecondColumnsSchemaMsg =
+      "message ParquetMicrosecondDataTypes { \n" +
+          "  required int32 rowKey; \n" +
+          "  required int64 _TIME_MICROS_int64  ( TIME_MICROS ) ; \n" +
+          "  required int64 _TIMESTAMP_MICROS_int64  ( TIMESTAMP_MICROS ) ; 
\n" +
+          "} \n";
 
   public static MessageType simpleSchema = 
MessageTypeParser.parseMessageType(simpleSchemaMsg);
   public static MessageType complexSchema = 
MessageTypeParser.parseMessageType(complexSchemaMsg);
   public static MessageType simpleNullableSchema = 
MessageTypeParser.parseMessageType(simpleNullableSchemaMsg);
   public static MessageType complexNullableSchema = 
MessageTypeParser.parseMessageType(complexNullableSchemaMsg);
   public static MessageType repeatedIntSchema = 
MessageTypeParser.parseMessageType(repeatedIntSchemaMsg);
+  public static MessageType microsecondColumnsSchema = 
MessageTypeParser.parseMessageType(microsecondColumnsSchemaMsg);
 
 
   public static Path initFile(String fileName) {
@@ -488,6 +495,24 @@ public class ParquetSimpleTestFileGenerator {
     }
   }
 
+  public static void writeMicrosecondValues(
+      SimpleGroupFactory groupFactory,
+      ParquetWriter<Group> writer,
+      long[] timeMicrosValues,
+      long[] timestampMicrosValues) throws IOException {
+
+    int numValues = Math.min(timeMicrosValues.length, 
timestampMicrosValues.length);
+    for (int i = 0; i < numValues; i++) {
+
+      writer.write(
+          groupFactory.newGroup()
+              .append("rowKey", i + 1)
+              .append("_TIME_MICROS_int64", timeMicrosValues[i])
+              .append("_TIMESTAMP_MICROS_int64", timestampMicrosValues[i])
+      );
+    }
+  }
+
   public static void main(String[] args) throws IOException {
 
     SimpleGroupFactory sgf = new SimpleGroupFactory(simpleSchema);
@@ -495,6 +520,7 @@ public class ParquetSimpleTestFileGenerator {
     SimpleGroupFactory sngf = new SimpleGroupFactory(simpleNullableSchema);
     GroupFactory ngf = new SimpleGroupFactory(complexNullableSchema);
     SimpleGroupFactory repeatedIntGroupFactory = new 
SimpleGroupFactory(repeatedIntSchema);
+    SimpleGroupFactory microsecondGroupFactory = new 
SimpleGroupFactory(microsecondColumnsSchema);
 
     // Generate files with dictionary encoding enabled and disabled
     ParquetWriter<Group> simpleWriter = initWriter(simpleSchema, 
"drill/parquet_test_file_simple", true);
@@ -506,6 +532,8 @@ public class ParquetSimpleTestFileGenerator {
     ParquetWriter<Group> simpleNullableNoDictWriter = 
initWriter(simpleNullableSchema, 
"drill/parquet_test_file_simple_nullable_nodict", false);
     ParquetWriter<Group> complexNullableNoDictWriter = 
initWriter(complexNullableSchema, 
"drill/parquet_test_file_complex_nullable_nodict", false);
     ParquetWriter<Group> repeatedIntV2Writer = initWriter(repeatedIntSchema, 
"drill/parquet_v2_repeated_int.parquet", 
ParquetProperties.WriterVersion.PARQUET_2_0, true);
+    ParquetWriter<Group> microsecondWriter = 
initWriter(microsecondColumnsSchema, "drill/microseconds.parquet", false);
+    ParquetWriter<Group> microsecondSmallDiffWriter = 
initWriter(microsecondColumnsSchema, "drill/microseconds_small_diff.parquet", 
false);
 
     ParquetSimpleTestFileGenerator.writeSimpleValues(sgf, simpleWriter, false);
     ParquetSimpleTestFileGenerator.writeSimpleValues(sngf, 
simpleNullableWriter, true);
@@ -516,6 +544,16 @@ public class ParquetSimpleTestFileGenerator {
     ParquetSimpleTestFileGenerator.writeComplexValues(gf, complexNoDictWriter, 
false);
     ParquetSimpleTestFileGenerator.writeComplexValues(ngf, 
complexNullableNoDictWriter, true);
     
ParquetSimpleTestFileGenerator.writeRepeatedIntValues(repeatedIntGroupFactory, 
repeatedIntV2Writer, 100);
+    ParquetSimpleTestFileGenerator.writeMicrosecondValues(
+        microsecondGroupFactory,
+        microsecondWriter,
+        TestMicrosecondColumns.TIME_MICROS_VALUES,
+        TestMicrosecondColumns.TIMESTAMP_MICROS_VALUES);
+    ParquetSimpleTestFileGenerator.writeMicrosecondValues(
+        microsecondGroupFactory,
+        microsecondSmallDiffWriter,
+        TestMicrosecondColumns.TIME_MICROS_SMALL_DIFF_VALUES,
+        TestMicrosecondColumns.TIMESTAMP_MICROS_SMALL_DIFF_VALUES);
 
     simpleWriter.close();
     complexWriter.close();
@@ -526,7 +564,8 @@ public class ParquetSimpleTestFileGenerator {
     simpleNullableNoDictWriter.close();
     complexNullableNoDictWriter.close();
     repeatedIntV2Writer.close();
-
+    microsecondWriter.close();
+    microsecondSmallDiffWriter.close();
   }
 
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
index 01832b446e..a81cdbf370 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java
@@ -17,29 +17,22 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import java.io.IOException;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
 
 import org.apache.drill.categories.ParquetTest;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.TestBuilder;
 
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
-
+import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -55,33 +48,45 @@ public class TestMicrosecondColumns extends ClusterTest {
   private static final String TO_TIMESTAMP_TEMPLATE = "TO_TIMESTAMP('%s', 
'yyy-MM-dd''T''HH:mm:ss.SSS')";
   private static final DateTimeFormatter TIMESTAMP_FORMATTER = 
DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT);
 
-  // The parquet file used in the test cases, can be generated by calling 
createParquetTestFile().
+  // The parquet files used in the test cases, created by 
ParquetSimpleTestFileGenerator.
   private static final String DATAFILE = "cp.`parquet/microseconds.parquet`";
+  private static final String DATAFILE_SMALL_DIFF = 
"cp.`parquet/microseconds_small_diff.parquet`";
 
-  // Schema used to generate the parquet test file.
-  private static final String SCHEMA =
-      "message ParquetMicrosecondDataTypes { \n" +
-          "  required int32 rowKey; \n" +
-          "  required int64 _TIME_MICROS_int64  ( TIME_MICROS ) ; \n" +
-          "  required int64 _TIMESTAMP_MICROS_int64  ( TIMESTAMP_MICROS ) ; 
\n" +
-          "} \n";
-
-  // Test values for the _TIME_MICROS_int64 field. Will be written to the test 
parquet file when
-  // calling createParquetTestFile().
-  private static final long[] TIME_MICROS_VALUES = {
+  // Test values for the _TIME_MICROS_int64 field. Will be written to the test 
parquet file by
+  // ParquetSimpleTestFileGenerator.
+   static final long[] TIME_MICROS_VALUES = {
       toMicrosecondTime(0, 32, 58, 174711),
       toMicrosecondTime(9, 0, 22, 654321),
       toMicrosecondTime(22, 12, 41, 123456)
   };
 
-  // Test values for the _TIMESTAMP_MICROS_int64 field. Will be written to the 
test parquet file
-  // when calling createParquetTestFile().
-  private static final long[] TIMESTAMP_MICROS_VALUES = {
+  // Test values for the _TIMESTAMP_MICROS_int64 field. Will be written to the 
test parquet file by
+  // ParquetSimpleTestFileGenerator.
+   static final long[] TIMESTAMP_MICROS_VALUES = {
       toMicrosecondTimestamp(2021, 8, 1, 22, 12, 41, 123456),
       toMicrosecondTimestamp(2022, 5, 6, 9, 0, 22, 654321),
       toMicrosecondTimestamp(2023, 2, 10, 0, 32, 58, 174711)
   };
 
+  // Test values with small differences (less than a millisecond) for the 
_TIME_MICROS_int64 field.
+  // Used for testing ORDER BY. Written to the test parquet file by 
ParquetSimpleTestFileGenerator.
+  static final long[] TIME_MICROS_SMALL_DIFF_VALUES = {
+      toMicrosecondTime(10, 11, 12, 336804),
+      toMicrosecondTime(10, 11, 12, 336587),
+      toMicrosecondTime(10, 11, 12, 336172),
+      toMicrosecondTime(10, 11, 12, 336991),
+      toMicrosecondTime(10, 11, 12, 336336)
+  };
+
+  // Test values with small differences (less than a millisecond) for the 
_TIMESTAMP_MICROS_int64
+  // field. Used for testing ORDER BY. Written to the test parquet file by 
ParquetSimpleTestFileGenerator.
+  static final long[] TIMESTAMP_MICROS_SMALL_DIFF_VALUES = {
+      toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182665),
+      toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182429),
+      toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182707),
+      toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182003),
+      toMicrosecondTimestamp(2024, 3, 16, 19, 1, 54, 182860)
+  };
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -89,6 +94,13 @@ public class TestMicrosecondColumns extends ClusterTest {
   }
 
 
+  @After
+  public void restoreSession() {
+    client.alterSession(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64, 
false);
+    
client.alterSession(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64, 
false);
+  }
+
+
   @Test
   public void testSelectTimeColumns() throws Exception {
     // DRILL-8423
@@ -170,6 +182,100 @@ public class TestMicrosecondColumns extends ClusterTest {
   }
 
 
+  @Test
+  public void testSelectTimeColumnAsBigInt() throws Exception {
+    String query = "select _TIME_MICROS_int64 as t from %s";
+    testBuilder()
+        .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("t")
+        .baselineValues(TIME_MICROS_VALUES[0])
+        .baselineValues(TIME_MICROS_VALUES[1])
+        .baselineValues(TIME_MICROS_VALUES[2])
+        .go();
+  }
+
+
+  @Test
+  public void testSelectStarTimeColumnAsBigInt() throws Exception {
+    // PARQUET_READER_TIME_MICROS_AS_INT64 should only affect time_micros 
columns, not
+    // timestamp_micros columns.
+    String query = "select * from %s";
+    testBuilder()
+        .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        
.baselineColumns("rowKey","_TIME_MICROS_int64","_TIMESTAMP_MICROS_int64")
+        .baselineValues(1, TIME_MICROS_VALUES[0], 
toLocalDateTime(TIMESTAMP_MICROS_VALUES[0]))
+        .baselineValues(2, TIME_MICROS_VALUES[1], 
toLocalDateTime(TIMESTAMP_MICROS_VALUES[1]))
+        .baselineValues(3, TIME_MICROS_VALUES[2], 
toLocalDateTime(TIMESTAMP_MICROS_VALUES[2]))
+        .go();
+  }
+
+
+  @Test
+  public void testSelectTimeColumnAsBigIntWithBigIntFilter() throws Exception {
+    String query = "select _TIME_MICROS_int64 as t from %s where 
_TIME_MICROS_int64 > " + TIME_MICROS_VALUES[0];
+    testBuilder()
+        .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("t")
+        .baselineValues(TIME_MICROS_VALUES[1])
+        .baselineValues(TIME_MICROS_VALUES[2])
+        .go();
+  }
+
+
+  @Test
+  public void testSelectTimeColumnAsBigIntWithTimeFilter() throws Exception {
+    String query = "select _TIME_MICROS_int64 as t from %s where 
TO_TIME(_TIME_MICROS_int64/1000) > " + 
createToTimeFragment(TIME_MICROS_VALUES[0]);
+    testBuilder()
+        .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("t")
+        .baselineValues(TIME_MICROS_VALUES[1])
+        .baselineValues(TIME_MICROS_VALUES[2])
+        .go();
+  }
+
+
+  @Test
+  public void testOrderByTimeColumnAsBigInt() throws Exception {
+    long[] sortedValues = Arrays.copyOf(TIME_MICROS_SMALL_DIFF_VALUES, 
TIME_MICROS_SMALL_DIFF_VALUES.length);
+    Arrays.sort(sortedValues);
+    String query = "select _TIME_MICROS_int64 as t from %s ORDER BY t";
+    TestBuilder builder = testBuilder()
+        .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE_SMALL_DIFF)
+        .ordered()
+        .baselineColumns("t");
+    for (long expectedValue : sortedValues) {
+      builder.baselineValues(expectedValue);
+    }
+    builder.go();
+  }
+
+
+  @Test
+  public void testOrderByDescTimeColumnAsBigInt() throws Exception {
+    long[] sortedValues = Arrays.copyOf(TIME_MICROS_SMALL_DIFF_VALUES, 
TIME_MICROS_SMALL_DIFF_VALUES.length);
+    Arrays.sort(sortedValues);
+    String query = "select _TIME_MICROS_int64 as t from %s ORDER BY t DESC";
+    TestBuilder builder = testBuilder()
+        .enableSessionOption(ExecConstants.PARQUET_READER_TIME_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE_SMALL_DIFF)
+        .ordered()
+        .baselineColumns("t");
+    for (int i=sortedValues.length-1; i>= 0; i--) {
+      builder.baselineValues(sortedValues[i]);
+    }
+    builder.go();
+  }
+
+
   @Test
   public void testSelectTimestampColumns() throws Exception {
     String query = "select _TIMESTAMP_MICROS_int64 as t from %s";
@@ -270,44 +376,111 @@ public class TestMicrosecondColumns extends ClusterTest {
   }
 
 
-  private void executeFilterQuery(String whereClause, long expectedCount) 
throws Exception {
-    String query = "select count(*) as c from %s where " + whereClause;
+  @Test
+  public void testSelectTimestampColumnAsBigInt() throws Exception {
+    String query = "select _TIMESTAMP_MICROS_int64 as t from %s";
     testBuilder()
+        
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
         .sqlQuery(query, DATAFILE)
         .unOrdered()
-        .baselineColumns("c")
-        .baselineValues(expectedCount)
+        .baselineColumns("t")
+        .baselineValues(TIMESTAMP_MICROS_VALUES[0])
+        .baselineValues(TIMESTAMP_MICROS_VALUES[1])
+        .baselineValues(TIMESTAMP_MICROS_VALUES[2])
+        .go();
+  }
+
+
+  @Test
+  public void testSelectStarTimestampColumnAsBigInt() throws Exception {
+    // PARQUET_READER_TIMESTAMP_MICROS_AS_INT64 should only affect 
timestamp_micros columns, not
+    // time_micros columns.
+    String query = "select * from %s";
+    testBuilder()
+        
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        
.baselineColumns("rowKey","_TIME_MICROS_int64","_TIMESTAMP_MICROS_int64")
+        .baselineValues(1, toLocalTime(TIME_MICROS_VALUES[0]), 
TIMESTAMP_MICROS_VALUES[0])
+        .baselineValues(2, toLocalTime(TIME_MICROS_VALUES[1]), 
TIMESTAMP_MICROS_VALUES[1])
+        .baselineValues(3, toLocalTime(TIME_MICROS_VALUES[2]), 
TIMESTAMP_MICROS_VALUES[2])
+        .go();
+  }
+
+
+  @Test
+  public void testSelectTimestampColumnAsBigIntWithBigIntFilter() throws 
Exception {
+    String query = "select _TIMESTAMP_MICROS_int64 as t from %s where 
_TIMESTAMP_MICROS_int64 > " + TIMESTAMP_MICROS_VALUES[0];
+    testBuilder()
+        
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("t")
+        .baselineValues(TIMESTAMP_MICROS_VALUES[1])
+        .baselineValues(TIMESTAMP_MICROS_VALUES[2])
+        .go();
+  }
+
+
+  @Test
+  public void testSelectTimestampColumnAsBigIntWithTimestampFilter() throws 
Exception {
+    // TO_TIMESTAMP(double) creates a timestamp in the system default 
timezone, must compare to
+    // a TO_TIMESTAMP(string ,format) in the same timezone.
+    String toTimestampTerm = 
createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0], ZoneId.systemDefault());
+    String query = "select _TIMESTAMP_MICROS_int64 as t from %s where 
TO_TIMESTAMP(_TIMESTAMP_MICROS_int64/1000000) > " + toTimestampTerm;
+    testBuilder()
+        
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("t")
+        .baselineValues(TIMESTAMP_MICROS_VALUES[1])
+        .baselineValues(TIMESTAMP_MICROS_VALUES[2])
         .go();
   }
 
 
-  public static void createParquetTestFile(String filePath) throws IOException 
{
+  @Test
+  public void testOrderByTimestampColumnAsBigInt() throws Exception {
+    long[] sortedValues = Arrays.copyOf(TIMESTAMP_MICROS_SMALL_DIFF_VALUES, 
TIMESTAMP_MICROS_SMALL_DIFF_VALUES.length);
+    Arrays.sort(sortedValues);
+    String query = "select _TIMESTAMP_MICROS_int64 as t from %s ORDER BY t";
+    TestBuilder builder = testBuilder()
+        
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE_SMALL_DIFF)
+        .ordered()
+        .baselineColumns("t");
+    for (long expectedValue : sortedValues) {
+      builder.baselineValues(expectedValue);
+    }
+    builder.go();
+  }
 
-    MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA);
-    GroupWriteSupport.setSchema(messageType, 
ParquetSimpleTestFileGenerator.conf);
-    SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);
 
-    try (ParquetWriter<Group> writer = createParquetWriter(filePath)) {
-      for (int i=0; i<TIME_MICROS_VALUES.length; i++) {
-        writer.write(
-            groupFactory.newGroup()
-                .append("rowKey", i+1)
-                .append("_TIME_MICROS_int64", TIME_MICROS_VALUES[i])
-                .append("_TIMESTAMP_MICROS_int64", TIMESTAMP_MICROS_VALUES[i])
-        );
-      }
+  @Test
+  public void testOrderByDescTimestampColumnAsBigInt() throws Exception {
+    long[] sortedValues = Arrays.copyOf(TIMESTAMP_MICROS_SMALL_DIFF_VALUES, 
TIMESTAMP_MICROS_SMALL_DIFF_VALUES.length);
+    Arrays.sort(sortedValues);
+    String query = "select _TIMESTAMP_MICROS_int64 as t from %s ORDER BY t 
DESC";
+    TestBuilder builder = testBuilder()
+        
.enableSessionOption(ExecConstants.PARQUET_READER_TIMESTAMP_MICROS_AS_INT64)
+        .sqlQuery(query, DATAFILE_SMALL_DIFF)
+        .ordered()
+        .baselineColumns("t");
+    for (int i=sortedValues.length-1; i>= 0; i--) {
+      builder.baselineValues(sortedValues[i]);
     }
+    builder.go();
   }
 
 
-  private static ParquetWriter<Group> createParquetWriter(String filePath) 
throws IOException {
-    return
-        
ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath))
-            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
-            .withCompressionCodec(CompressionCodecName.GZIP)
-            .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
-            .withConf(ParquetSimpleTestFileGenerator.conf)
-            .build();
+  private void executeFilterQuery(String whereClause, long expectedCount) 
throws Exception {
+    String query = "select count(*) as c from %s where " + whereClause;
+    testBuilder()
+        .sqlQuery(query, DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(expectedCount)
+        .go();
   }
 
 
@@ -321,13 +494,23 @@ public class TestMicrosecondColumns extends ClusterTest {
   }
 
 
+  private static String createToTimestampFragment(long micros, ZoneId 
timeZone) {
+    return String.format(TO_TIMESTAMP_TEMPLATE, 
TIMESTAMP_FORMATTER.format(toLocalDateTime(micros, timeZone)));
+  }
+
+
   private static LocalTime toLocalTime(long micros) {
     return LocalTime.ofNanoOfDay((micros/1000L) * 1000_000L);
   }
 
 
   private static LocalDateTime toLocalDateTime(long micros) {
-    return LocalDateTime.ofInstant(Instant.ofEpochMilli(micros/1000L), 
ZoneOffset.ofHours(0));
+    return toLocalDateTime(micros, ZoneOffset.ofHours(0));
+  }
+
+
+  private static LocalDateTime toLocalDateTime(long micros, ZoneId timeZone) {
+    return LocalDateTime.ofInstant(Instant.ofEpochMilli(micros/1000L), 
timeZone);
   }
 
 
diff --git a/exec/java-exec/src/test/resources/parquet/microseconds.parquet 
b/exec/java-exec/src/test/resources/parquet/microseconds.parquet
index 7bac7aa26d..3faef5335f 100644
Binary files a/exec/java-exec/src/test/resources/parquet/microseconds.parquet 
and b/exec/java-exec/src/test/resources/parquet/microseconds.parquet differ
diff --git 
a/exec/java-exec/src/test/resources/parquet/microseconds_small_diff.parquet 
b/exec/java-exec/src/test/resources/parquet/microseconds_small_diff.parquet
new file mode 100644
index 0000000000..089ae70a29
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/microseconds_small_diff.parquet 
differ


Reply via email to