[ https://issues.apache.org/jira/browse/DRILL-4517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904672#comment-16904672 ]
ASF GitHub Bot commented on DRILL-4517: --------------------------------------- vvysotskyi commented on pull request #1839: DRILL-4517: Support reading empty Parquet files URL: https://github.com/apache/drill/pull/1839#discussion_r312741644 ########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.metadata; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.store.parquet.ParquetReaderConfig; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Collects file metadata for the given parquet file. For empty parquet file will + * generate fake row group metadata based on file schema. + */ +public class FileMetadataCollector { + + private static final Logger logger = LoggerFactory.getLogger(FileMetadataCollector.class); + + private final ParquetMetadata metadata; + private final FileStatus file; + private final FileSystem fs; + private final boolean allColumnsInteresting; + private final boolean skipNonInteresting; + private final Set<String> columnSet; + + private final MessageType schema; + private final ParquetReaderUtility.DateCorruptionStatus containsCorruptDates; + private final Map<SchemaPath, ColTypeInfo> colTypeInfoMap; + + private final Map<Metadata_V4.ColumnTypeMetadata_v4.Key, Long> totalNullCountMap = new HashMap<>(); + private final Map<Metadata_V4.ColumnTypeMetadata_v4.Key, Metadata_V4.ColumnTypeMetadata_v4> columnTypeInfo = new HashMap<>(); + + private Metadata_V4.ParquetFileAndRowCountMetadata fileMetadata; + + public FileMetadataCollector(ParquetMetadata metadata, + FileStatus file, + FileSystem fs, + boolean allColumnsInteresting, + boolean skipNonInteresting, + Set<String> columnSet, + ParquetReaderConfig readerConfig) throws IOException { + this.metadata = metadata; + this.file = file; + this.fs = fs; + this.allColumnsInteresting = allColumnsInteresting; + this.skipNonInteresting = skipNonInteresting; + this.columnSet = columnSet; + + this.schema = metadata.getFileMetaData().getSchema(); + + this.containsCorruptDates = + ParquetReaderUtility.detectCorruptDates(metadata, Collections.singletonList(SchemaPath.STAR_COLUMN), + readerConfig.autoCorrectCorruptedDates()); + logger.debug("Contains corrupt dates: {}.", containsCorruptDates); + + this.colTypeInfoMap = new HashMap<>(); + for (String[] path : schema.getPaths()) { + colTypeInfoMap.put(SchemaPath.getCompoundPath(path), ColTypeInfo.of(schema, schema, path, 0)); + } + + init(); + } + + public Metadata_V4.ParquetFileAndRowCountMetadata getFileMetadata() { + return fileMetadata; + } + + public Map<Metadata_V4.ColumnTypeMetadata_v4.Key, Metadata_V4.ColumnTypeMetadata_v4> getColumnTypeInfo() { + return columnTypeInfo; + } + + private void init() throws IOException { + long totalRowCount = 0; + + List<Metadata_V4.RowGroupMetadata_v4> rowGroupMetadataList = new ArrayList<>(); + + for (BlockMetaData rowGroup : metadata.getBlocks()) { + List<Metadata_V4.ColumnMetadata_v4> columnMetadataList = new ArrayList<>(); + long length = 0; + totalRowCount = totalRowCount + rowGroup.getRowCount(); + for (ColumnChunkMetaData col : rowGroup.getColumns()) { + String[] columnName = col.getPath().toArray(); + Statistics<?> stats = col.getStatistics(); + PrimitiveType.PrimitiveTypeName primitiveTypeName = col.getPrimitiveType().getPrimitiveTypeName(); + addColumnMetadata(columnName, stats, primitiveTypeName, columnMetadataList); + length += col.getTotalSize(); + } + + // DRILL-5009: Skip the RowGroup if it is empty + // Note we still read the schema even if there are no values in the RowGroup + if (rowGroup.getRowCount() == 0) { + continue; + } + + Metadata_V4.RowGroupMetadata_v4 rowGroupMeta = new Metadata_V4.RowGroupMetadata_v4(rowGroup.getStartingPos(), length, rowGroup.getRowCount(), + getHostAffinity(rowGroup.getStartingPos(), length), columnMetadataList); + + rowGroupMetadataList.add(rowGroupMeta); + } + + // add fake row group based on file schema in case when file is empty or all row groups are empty + if (rowGroupMetadataList.isEmpty()) { + List<Metadata_V4.ColumnMetadata_v4> columnMetadataList = new ArrayList<>(); + for (ColumnDescriptor columnDescriptor : schema.getColumns()) { + + Statistics<?> stats = Statistics.getBuilderForReading(columnDescriptor.getPrimitiveType()) + .withMax(null) + .withMin(null) + .withNumNulls(0) + .build(); + + addColumnMetadata(columnDescriptor.getPath(), stats, + columnDescriptor.getPrimitiveType().getPrimitiveTypeName(), columnMetadataList); + } + + Metadata_V4.RowGroupMetadata_v4 rowGroupMeta = new Metadata_V4.RowGroupMetadata_v4(0L, 0L, + 0L, getHostAffinity(0, 0L), columnMetadataList); + rowGroupMetadataList.add(rowGroupMeta); + } + + Path path = Path.getPathWithoutSchemeAndAuthority(file.getPath()); + Metadata_V4.ParquetFileMetadata_v4 parquetFileMetadata_v4 = new Metadata_V4.ParquetFileMetadata_v4(path, file.getLen(), rowGroupMetadataList); + this.fileMetadata = new Metadata_V4.ParquetFileAndRowCountMetadata(parquetFileMetadata_v4, totalNullCountMap, totalRowCount); + } + + private void addColumnMetadata(String[] columnName, + Statistics<?> stats, + PrimitiveType.PrimitiveTypeName primitiveTypeName, + List<Metadata_V4.ColumnMetadata_v4> columnMetadataList) { + SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName); + boolean thisColumnIsInteresting = allColumnsInteresting || columnSet == null || columnSet.contains(columnSchemaName.getRootSegmentPath()); + + if (skipNonInteresting && !thisColumnIsInteresting) { + return; + } + + ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName); + long totalNullCount = stats.getNumNulls(); + Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata = new Metadata_V4.ColumnTypeMetadata_v4( + columnName, primitiveTypeName, + colTypeInfo.originalType, colTypeInfo.precision, colTypeInfo.scale, + colTypeInfo.repetitionLevel, colTypeInfo.definitionLevel, 0, false); + Metadata_V4.ColumnTypeMetadata_v4.Key columnTypeMetadataKey = new Metadata_V4.ColumnTypeMetadata_v4.Key(columnTypeMetadata.name); + + totalNullCountMap.putIfAbsent(columnTypeMetadataKey, Metadata.DEFAULT_NULL_COUNT); + if (totalNullCountMap.get(columnTypeMetadataKey) < 0 || totalNullCount < 0) { + totalNullCountMap.put(columnTypeMetadataKey, Metadata.NULL_COUNT_NOT_EXISTS); + } else { + long nullCount = totalNullCountMap.get(columnTypeMetadataKey) + totalNullCount; + totalNullCountMap.put(columnTypeMetadataKey, nullCount); + } + if (thisColumnIsInteresting) { + // Save the column schema info. We'll merge it into one list + Object minValue = null; + Object maxValue = null; + boolean statsAvailable = !stats.isEmpty(); + if (statsAvailable) { + if (stats.hasNonNullValue()) { Review comment: Could you please combine these two if statements? P.S. Since this code was moved from another place, it is optional to address this comment, but in either way, I've written it :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reading emtpy Parquet file failes with java.lang.IllegalArgumentException > ------------------------------------------------------------------------- > > Key: DRILL-4517 > URL: https://issues.apache.org/jira/browse/DRILL-4517 > Project: Apache Drill > Issue Type: Improvement > Components: Server > Reporter: Tobias > Assignee: Arina Ielchiieva > Priority: Major > Labels: doc-impacting > Fix For: 1.17.0 > > Attachments: empty.parquet, no_rows.parquet > > > When querying a Parquet file that has a schema but no rows the Drill Server > will fail with the below > This looks similar to DRILL-3557 > {noformat} > {{ParquetMetaData{FileMetaData{schema: message TRANSACTION_REPORT { > required int64 MEMBER_ACCOUNT_ID; > required int64 TIMESTAMP_IN_HOUR; > optional int64 APPLICATION_ID; > } > , metadata: {}}}, blocks: []} > {noformat} > {noformat} > Caused by: java.lang.IllegalArgumentException: MinorFragmentId 0 has no read > entries assigned > at > com.google.common.base.Preconditions.checkArgument(Preconditions.java:92) > ~[guava-14.0.1.jar:na] > at > org.apache.drill.exec.store.parquet.ParquetGroupScan.getSpecificScan(ParquetGroupScan.java:707) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.store.parquet.ParquetGroupScan.getSpecificScan(ParquetGroupScan.java:105) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.Materializer.visitGroupScan(Materializer.java:68) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.Materializer.visitGroupScan(Materializer.java:35) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.physical.base.AbstractGroupScan.accept(AbstractGroupScan.java:60) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.Materializer.visitOp(Materializer.java:102) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.Materializer.visitOp(Materializer.java:35) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.physical.base.AbstractPhysicalVisitor.visitProject(AbstractPhysicalVisitor.java:77) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.physical.config.Project.accept(Project.java:51) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.Materializer.visitStore(Materializer.java:82) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.Materializer.visitStore(Materializer.java:35) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.physical.base.AbstractPhysicalVisitor.visitScreen(AbstractPhysicalVisitor.java:195) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.physical.config.Screen.accept(Screen.java:97) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.SimpleParallelizer.generateWorkUnit(SimpleParallelizer.java:355) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.planner.fragment.SimpleParallelizer.getFragments(SimpleParallelizer.java:134) > ~[drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.work.foreman.Foreman.getQueryWorkUnit(Foreman.java:518) > [drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.work.foreman.Foreman.runPhysicalPlan(Foreman.java:405) > [drill-java-exec-1.5.0.jar:1.5.0] > at > org.apache.drill.exec.work.foreman.Foreman.runSQL(Foreman.java:926) > [drill-java-exec-1.5.0.jar:1.5.0] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.14#76016)