This is an automated email from the ASF dual-hosted git repository. dzamo pushed a commit to branch 1.21 in repository https://gitbox.apache.org/repos/asf/drill.git
commit fa22d67022b49b47d16e84bfc0bd26b97e58fb11 Author: Peter Franzen <pe...@myire.org> AuthorDate: Tue Apr 18 09:43:45 2023 +0200 DRILL-8421: Parquet microsecond columns (#2793) * Read parquet TIME_MICROS columns as 64-bit values before truncating to 32-bits * Truncate parquet min and max metadata values for microsecond columns to milliseconds * Express parquet TIME_MICROS metadata as Integer values --- .../store/parquet/ParquetTableMetadataUtils.java | 3 +- .../NullableFixedByteAlignedReaders.java | 2 +- .../ParquetFixedWidthDictionaryReaders.java | 6 +- .../parquet/metadata/FileMetadataCollector.java | 18 ++ .../exec/store/parquet/TestMicrosecondColumns.java | 355 +++++++++++++++++++++ .../test/resources/parquet/microseconds.parquet | Bin 0 -> 871 bytes 6 files changed, 379 insertions(+), 5 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java index d1505d1120..a25b1fb5c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java @@ -344,10 +344,11 @@ public class ParquetTableMetadataUtils { case INT64: if (originalType == OriginalType.DECIMAL) { return BigInteger.valueOf(getLong(value)); + } else if (originalType == OriginalType.TIME_MICROS) { + return getInt(value); } else { return getLong(value); } - case FLOAT: return getFloat(value); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index 9b82620c3b..b678bb8b61 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -193,7 +193,7 @@ public class NullableFixedByteAlignedReaders { protected void readField(long recordsToReadInThisPass) { ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader(); for (int i = 0; i < recordsToReadInThisPass; i++) { - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000)); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index ea13a4c42d..2a916ba007 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -168,13 +168,13 @@ public class ParquetFixedWidthDictionaryReaders { if (recordsRequireDecoding()) { ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader(); for (int i = 0; i < recordsReadInThisIteration; i++) { - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000)); } } else { int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0); for (int i = 0; i < recordsReadInThisIteration; i++) { - int value = pageReader.pageData.getInt((int) readStartInBytes + i * dataTypeLengthInBytes); - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, value / 1000); + long value = pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (value / 1000)); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java index c79996bc6a..9950171f97 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java @@ -208,6 +208,12 @@ public class FileMetadataCollector { minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue); maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue); } + if (isMicrosecondColumnType(columnTypeMetadata.originalType)) { + // DRILL-8241: truncate the min/max of microsecond columns to milliseconds, otherwise the + // initial scanning of files when filtering will compare to the wrong values. + minValue = truncateMicros(minValue); + maxValue = truncateMicros(maxValue); + } } long numNulls = stats.getNumNulls(); Metadata_V4.ColumnMetadata_v4 columnMetadata = new Metadata_V4.ColumnMetadata_v4(columnTypeMetadata.name, @@ -218,6 +224,18 @@ public class FileMetadataCollector { columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata); } + private static boolean isMicrosecondColumnType(OriginalType columnType) { + return columnType == OriginalType.TIME_MICROS || columnType == OriginalType.TIMESTAMP_MICROS; + } + + private static Object truncateMicros(Object microSeconds) { + if (microSeconds instanceof Number) { + return Long.valueOf(((Number) microSeconds).longValue() / 1000); + } else { + return microSeconds; + } + } + /** * Get the host affinity for a row group. * diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java new file mode 100644 index 0000000000..01832b446e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestMicrosecondColumns.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import java.io.IOException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +import org.apache.drill.categories.ParquetTest; +import org.apache.drill.categories.UnlikelyTest; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; + +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +@Category({ParquetTest.class, UnlikelyTest.class}) +public class TestMicrosecondColumns extends ClusterTest { + + private static final String TIME_FORMAT = "HH:mm:ss.SSS"; + private static final String TO_TIME_TEMPLATE = "TO_TIME('%s', 'HH:mm:ss.SSS')"; + private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern(TIME_FORMAT); + private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; + private static final String TO_TIMESTAMP_TEMPLATE = "TO_TIMESTAMP('%s', 'yyy-MM-dd''T''HH:mm:ss.SSS')"; + private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern(TIMESTAMP_FORMAT); + + // The parquet file used in the test cases, can be generated by calling createParquetTestFile(). + private static final String DATAFILE = "cp.`parquet/microseconds.parquet`"; + + // Schema used to generate the parquet test file. + private static final String SCHEMA = + "message ParquetMicrosecondDataTypes { \n" + + " required int32 rowKey; \n" + + " required int64 _TIME_MICROS_int64 ( TIME_MICROS ) ; \n" + + " required int64 _TIMESTAMP_MICROS_int64 ( TIMESTAMP_MICROS ) ; \n" + + "} \n"; + + // Test values for the _TIME_MICROS_int64 field. Will be written to the test parquet file when + // calling createParquetTestFile(). + private static final long[] TIME_MICROS_VALUES = { + toMicrosecondTime(0, 32, 58, 174711), + toMicrosecondTime(9, 0, 22, 654321), + toMicrosecondTime(22, 12, 41, 123456) + }; + + // Test values for the _TIMESTAMP_MICROS_int64 field. Will be written to the test parquet file + // when calling createParquetTestFile(). + private static final long[] TIMESTAMP_MICROS_VALUES = { + toMicrosecondTimestamp(2021, 8, 1, 22, 12, 41, 123456), + toMicrosecondTimestamp(2022, 5, 6, 9, 0, 22, 654321), + toMicrosecondTimestamp(2023, 2, 10, 0, 32, 58, 174711) + }; + + + @BeforeClass + public static void setUp() throws Exception { + startCluster(ClusterFixture.builder(dirTestWatcher)); + } + + + @Test + public void testSelectTimeColumns() throws Exception { + // DRILL-8423 + String query = "select _TIME_MICROS_int64 as t from %s"; + testBuilder() + .sqlQuery(query, DATAFILE) + .unOrdered() + .baselineColumns("t") + .baselineValues(toLocalTime(TIME_MICROS_VALUES[0])) + .baselineValues(toLocalTime(TIME_MICROS_VALUES[1])) + .baselineValues(toLocalTime(TIME_MICROS_VALUES[2])) + .go(); + } + + + @Test + public void testLessThanSmallestTime() throws Exception { + // No time values should be less than the smallest value in the parquet file + int expectedCount = 0; + String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[0]); + executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount); + } + + + @Test + public void testLessThanMidTime() throws Exception { + // The smallest time value should be less than the middle value in the parquet file + int expectedCount = 1; + String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[1]); + executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount); + } + + + @Test + public void testLessThanLargestTime() throws Exception { + // The smallest and middle time values should be less than the largest value in the parquet file + int expectedCount = 2; + String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[2]); + executeFilterQuery("_TIME_MICROS_int64 < " + timeExpr, expectedCount); + } + + + @Test + public void testGreaterThanSmallestTime() throws Exception { + // The middle and largest time values should be greater than the smallest value in the parquet + // file + int expectedCount = 2; + String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[0]); + executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount); + } + + + @Test + public void testGreaterThanMidTime() throws Exception { + // The largest time value should be greater than the middle value in the parquet file + int expectedCount = 1; + String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[1]); + executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount); + } + + + @Test + public void testGreaterThanLargestTime() throws Exception { + // No time value should be greater than the largest value in the parquet file + int expectedCount = 0; + String timeExpr = createToTimeFragment(TIME_MICROS_VALUES[2]); + executeFilterQuery("_TIME_MICROS_int64 > " + timeExpr, expectedCount); + } + + + @Test + public void testTimeRange() throws Exception { + // The middle time test value should be greater than the smallest value and less than the + // largest in the parquet file + int expectedCount = 1; + String lower = createToTimeFragment(TIME_MICROS_VALUES[0]); + String upper = createToTimeFragment(TIME_MICROS_VALUES[2]); + executeFilterQuery("_TIME_MICROS_int64 > " + lower + " and _TIME_MICROS_int64 < " + upper, expectedCount); + } + + + @Test + public void testSelectTimestampColumns() throws Exception { + String query = "select _TIMESTAMP_MICROS_int64 as t from %s"; + testBuilder() + .sqlQuery(query, DATAFILE) + .unOrdered() + .baselineColumns("t") + .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[0])) + .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[1])) + .baselineValues(toLocalDateTime(TIMESTAMP_MICROS_VALUES[2])) + .go(); + } + + + @Test + public void testLessThanSmallestTimestamp() throws Exception { + // No timestamp values should be less than the smallest value in the parquet file + int expectedCount = 0; + String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]); + executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount); + } + + + @Test + public void testLessThanMidTimestamp() throws Exception { + // The smallest timestamp value should be less than the middle value in the parquet file + int expectedCount = 1; + String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[1]); + executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount); + } + + + @Test + public void testLessThanLargestTimestamp() throws Exception { + // The smallest and middle timestamp values should be less than the largest value in the parquet + // file + int expectedCount = 2; + String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]); + executeFilterQuery("_TIMESTAMP_MICROS_int64 < " + timestampExpr, expectedCount); + } + + + @Test + public void testLessThanTimestampLongIntoTheFuture() throws Exception { + // All test timestamps should be less than a timestamp several hundred years into the future + // See https://issues.apache.org/jira/browse/DRILL-8421 + int expectedCount = 3; + String whereClause = "_TIMESTAMP_MICROS_int64 < TO_TIMESTAMP('2502-04-04 00:00:00', 'yyyy-MM-dd HH:mm:ss')"; + executeFilterQuery(whereClause, expectedCount); + } + + + @Test + public void testGreaterThanSmallestTimestamp() throws Exception { + // The middle and largest timestamp values should be greater than the smallest value in the + // parquet file + int expectedCount = 2; + String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]); + executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount); + } + + + @Test + public void testGreaterThanMidTimestamp() throws Exception { + // The largest timestamp value should be greater than the middle value in the parquet file + int expectedCount = 1; + String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[1]); + executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount); + } + + + @Test + public void testGreaterThanLargestTimestamp() throws Exception { + // No timestamp values should be greater than the largest value in the parquet file + int expectedCount = 0; + String timestampExpr = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]); + executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + timestampExpr, expectedCount); + } + + + @Test + public void testGreaterThanTimestampLongIntoTheFuture() throws Exception { + // No test timestamps should be greater than a timestamp several hundred years into the future + // See https://issues.apache.org/jira/browse/DRILL-8421 + int expectedCount = 0; + String whereClause = "_TIMESTAMP_MICROS_int64 > TO_TIMESTAMP('2502-04-04 00:00:00', 'yyyy-MM-dd HH:mm:ss')"; + executeFilterQuery(whereClause, expectedCount); + } + + + @Test + public void testTimestampRange() throws Exception { + // The middle timestamp test value should be greater than the smallest value and less than the + // largest in the parquet file + String lower = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[0]); + String upper = createToTimestampFragment(TIMESTAMP_MICROS_VALUES[2]); + executeFilterQuery("_TIMESTAMP_MICROS_int64 > " + lower + " and _TIMESTAMP_MICROS_int64 < " + upper, 1); + } + + + private void executeFilterQuery(String whereClause, long expectedCount) throws Exception { + String query = "select count(*) as c from %s where " + whereClause; + testBuilder() + .sqlQuery(query, DATAFILE) + .unOrdered() + .baselineColumns("c") + .baselineValues(expectedCount) + .go(); + } + + + public static void createParquetTestFile(String filePath) throws IOException { + + MessageType messageType = MessageTypeParser.parseMessageType(SCHEMA); + GroupWriteSupport.setSchema(messageType, ParquetSimpleTestFileGenerator.conf); + SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType); + + try (ParquetWriter<Group> writer = createParquetWriter(filePath)) { + for (int i=0; i<TIME_MICROS_VALUES.length; i++) { + writer.write( + groupFactory.newGroup() + .append("rowKey", i+1) + .append("_TIME_MICROS_int64", TIME_MICROS_VALUES[i]) + .append("_TIMESTAMP_MICROS_int64", TIMESTAMP_MICROS_VALUES[i]) + ); + } + } + } + + + private static ParquetWriter<Group> createParquetWriter(String filePath) throws IOException { + return + ExampleParquetWriter.builder(ParquetSimpleTestFileGenerator.initFile(filePath)) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withCompressionCodec(CompressionCodecName.GZIP) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) + .withConf(ParquetSimpleTestFileGenerator.conf) + .build(); + } + + + private static String createToTimeFragment(long micros) { + return String.format(TO_TIME_TEMPLATE, TIME_FORMATTER.format(toLocalTime(micros))); + } + + + private static String createToTimestampFragment(long micros) { + return String.format(TO_TIMESTAMP_TEMPLATE, TIMESTAMP_FORMATTER.format(toLocalDateTime(micros))); + } + + + private static LocalTime toLocalTime(long micros) { + return LocalTime.ofNanoOfDay((micros/1000L) * 1000_000L); + } + + + private static LocalDateTime toLocalDateTime(long micros) { + return LocalDateTime.ofInstant(Instant.ofEpochMilli(micros/1000L), ZoneOffset.ofHours(0)); + } + + + private static long toMicrosecondTime(int hour, int minute, int second, int microOfSecond) { + return LocalTime.of(hour, minute, second, microOfSecond*1000).toNanoOfDay() / 1000L; + } + + + private static long toMicrosecondTimestamp( + int year, + int month, + int dayOfMonth, + int hour, + int minute, + int second, + int microOfSecond) { + + Instant instant = + LocalDateTime + .of(year, month, dayOfMonth, hour, minute, second, microOfSecond*1000) + .toInstant(ZoneOffset.ofHours(0)); + + return instant.getEpochSecond() * 1000_000L + instant.getNano() / 1000L; + } +} diff --git a/exec/java-exec/src/test/resources/parquet/microseconds.parquet b/exec/java-exec/src/test/resources/parquet/microseconds.parquet new file mode 100644 index 0000000000..7bac7aa26d Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/microseconds.parquet differ