[ 
https://issues.apache.org/jira/browse/DRILL-4517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904672#comment-16904672
 ] 

ASF GitHub Bot commented on DRILL-4517:
---------------------------------------

vvysotskyi commented on pull request #1839: DRILL-4517: Support reading empty 
Parquet files
URL: https://github.com/apache/drill/pull/1839#discussion_r312741644
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java
 ##########
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.metadata;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Collects file metadata for the given parquet file. For empty parquet file 
will
+ * generate fake row group metadata based on file schema.
+ */
+public class FileMetadataCollector {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(FileMetadataCollector.class);
+
+  private final ParquetMetadata metadata;
+  private final FileStatus file;
+  private final FileSystem fs;
+  private final boolean allColumnsInteresting;
+  private final boolean skipNonInteresting;
+  private final Set<String> columnSet;
+
+  private final MessageType schema;
+  private final ParquetReaderUtility.DateCorruptionStatus containsCorruptDates;
+  private final Map<SchemaPath, ColTypeInfo> colTypeInfoMap;
+
+  private final Map<Metadata_V4.ColumnTypeMetadata_v4.Key, Long> 
totalNullCountMap = new HashMap<>();
+  private final Map<Metadata_V4.ColumnTypeMetadata_v4.Key, 
Metadata_V4.ColumnTypeMetadata_v4> columnTypeInfo = new HashMap<>();
+
+  private Metadata_V4.ParquetFileAndRowCountMetadata fileMetadata;
+
+  public FileMetadataCollector(ParquetMetadata metadata,
+                               FileStatus file,
+                               FileSystem fs,
+                               boolean allColumnsInteresting,
+                               boolean skipNonInteresting,
+                               Set<String> columnSet,
+                               ParquetReaderConfig readerConfig) throws 
IOException {
+    this.metadata = metadata;
+    this.file = file;
+    this.fs = fs;
+    this.allColumnsInteresting = allColumnsInteresting;
+    this.skipNonInteresting = skipNonInteresting;
+    this.columnSet = columnSet;
+
+    this.schema = metadata.getFileMetaData().getSchema();
+
+    this.containsCorruptDates =
+      ParquetReaderUtility.detectCorruptDates(metadata, 
Collections.singletonList(SchemaPath.STAR_COLUMN),
+        readerConfig.autoCorrectCorruptedDates());
+    logger.debug("Contains corrupt dates: {}.", containsCorruptDates);
+
+    this.colTypeInfoMap = new HashMap<>();
+    for (String[] path : schema.getPaths()) {
+      colTypeInfoMap.put(SchemaPath.getCompoundPath(path), 
ColTypeInfo.of(schema, schema, path, 0));
+    }
+
+    init();
+  }
+
+  public Metadata_V4.ParquetFileAndRowCountMetadata getFileMetadata() {
+    return fileMetadata;
+  }
+
+  public Map<Metadata_V4.ColumnTypeMetadata_v4.Key, 
Metadata_V4.ColumnTypeMetadata_v4> getColumnTypeInfo() {
+    return columnTypeInfo;
+  }
+
+  private void init() throws IOException {
+    long totalRowCount = 0;
+
+    List<Metadata_V4.RowGroupMetadata_v4> rowGroupMetadataList = new 
ArrayList<>();
+
+    for (BlockMetaData rowGroup : metadata.getBlocks()) {
+      List<Metadata_V4.ColumnMetadata_v4> columnMetadataList = new 
ArrayList<>();
+      long length = 0;
+      totalRowCount = totalRowCount + rowGroup.getRowCount();
+      for (ColumnChunkMetaData col : rowGroup.getColumns()) {
+        String[] columnName = col.getPath().toArray();
+        Statistics<?> stats = col.getStatistics();
+        PrimitiveType.PrimitiveTypeName primitiveTypeName = 
col.getPrimitiveType().getPrimitiveTypeName();
+        addColumnMetadata(columnName, stats, primitiveTypeName, 
columnMetadataList);
+        length += col.getTotalSize();
+      }
+
+      // DRILL-5009: Skip the RowGroup if it is empty
+      // Note we still read the schema even if there are no values in the 
RowGroup
+      if (rowGroup.getRowCount() == 0) {
+        continue;
+      }
+
+      Metadata_V4.RowGroupMetadata_v4 rowGroupMeta = new 
Metadata_V4.RowGroupMetadata_v4(rowGroup.getStartingPos(), length, 
rowGroup.getRowCount(),
+        getHostAffinity(rowGroup.getStartingPos(), length), 
columnMetadataList);
+
+      rowGroupMetadataList.add(rowGroupMeta);
+    }
+
+    // add fake row group based on file schema in case when file is empty or 
all row groups are empty
+    if (rowGroupMetadataList.isEmpty()) {
+      List<Metadata_V4.ColumnMetadata_v4> columnMetadataList = new 
ArrayList<>();
+      for (ColumnDescriptor columnDescriptor : schema.getColumns()) {
+
+        Statistics<?> stats = 
Statistics.getBuilderForReading(columnDescriptor.getPrimitiveType())
+          .withMax(null)
+          .withMin(null)
+          .withNumNulls(0)
+          .build();
+
+        addColumnMetadata(columnDescriptor.getPath(), stats,
+          columnDescriptor.getPrimitiveType().getPrimitiveTypeName(), 
columnMetadataList);
+      }
+
+      Metadata_V4.RowGroupMetadata_v4 rowGroupMeta = new 
Metadata_V4.RowGroupMetadata_v4(0L, 0L,
+        0L, getHostAffinity(0, 0L), columnMetadataList);
+      rowGroupMetadataList.add(rowGroupMeta);
+    }
+
+    Path path = Path.getPathWithoutSchemeAndAuthority(file.getPath());
+    Metadata_V4.ParquetFileMetadata_v4 parquetFileMetadata_v4 = new 
Metadata_V4.ParquetFileMetadata_v4(path, file.getLen(), rowGroupMetadataList);
+    this.fileMetadata = new 
Metadata_V4.ParquetFileAndRowCountMetadata(parquetFileMetadata_v4, 
totalNullCountMap, totalRowCount);
+  }
+
+  private void addColumnMetadata(String[] columnName,
+                                 Statistics<?> stats,
+                                 PrimitiveType.PrimitiveTypeName 
primitiveTypeName,
+                                 List<Metadata_V4.ColumnMetadata_v4> 
columnMetadataList) {
+    SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
+    boolean thisColumnIsInteresting = allColumnsInteresting || columnSet == 
null || columnSet.contains(columnSchemaName.getRootSegmentPath());
+
+    if (skipNonInteresting && !thisColumnIsInteresting) {
+      return;
+    }
+
+    ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName);
+    long totalNullCount = stats.getNumNulls();
+    Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata = new 
Metadata_V4.ColumnTypeMetadata_v4(
+      columnName, primitiveTypeName,
+      colTypeInfo.originalType, colTypeInfo.precision, colTypeInfo.scale,
+      colTypeInfo.repetitionLevel, colTypeInfo.definitionLevel, 0, false);
+    Metadata_V4.ColumnTypeMetadata_v4.Key columnTypeMetadataKey = new 
Metadata_V4.ColumnTypeMetadata_v4.Key(columnTypeMetadata.name);
+
+    totalNullCountMap.putIfAbsent(columnTypeMetadataKey, 
Metadata.DEFAULT_NULL_COUNT);
+    if (totalNullCountMap.get(columnTypeMetadataKey) < 0 || totalNullCount < 
0) {
+      totalNullCountMap.put(columnTypeMetadataKey, 
Metadata.NULL_COUNT_NOT_EXISTS);
+    } else {
+      long nullCount = totalNullCountMap.get(columnTypeMetadataKey) + 
totalNullCount;
+      totalNullCountMap.put(columnTypeMetadataKey, nullCount);
+    }
+    if (thisColumnIsInteresting) {
+      // Save the column schema info. We'll merge it into one list
+      Object minValue = null;
+      Object maxValue = null;
+      boolean statsAvailable = !stats.isEmpty();
+      if (statsAvailable) {
+        if (stats.hasNonNullValue()) {
 
 Review comment:
   Could you please combine these two if statements?
   
   P.S. Since this code was moved from another place, it is optional to address 
this comment, but in either way, I've written it :)
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reading emtpy Parquet file failes with java.lang.IllegalArgumentException
> -------------------------------------------------------------------------
>
>                 Key: DRILL-4517
>                 URL: https://issues.apache.org/jira/browse/DRILL-4517
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components:  Server
>            Reporter: Tobias
>            Assignee: Arina Ielchiieva
>            Priority: Major
>              Labels: doc-impacting
>             Fix For: 1.17.0
>
>         Attachments: empty.parquet, no_rows.parquet
>
>
> When querying a Parquet file that has a schema but no rows the Drill Server 
> will fail with the below
> This looks similar to DRILL-3557
> {noformat}
> {{ParquetMetaData{FileMetaData{schema: message TRANSACTION_REPORT {
>   required int64 MEMBER_ACCOUNT_ID;
>   required int64 TIMESTAMP_IN_HOUR;
>   optional int64 APPLICATION_ID;
> }
> , metadata: {}}}, blocks: []}
> {noformat}
> {noformat}
> Caused by: java.lang.IllegalArgumentException: MinorFragmentId 0 has no read 
> entries assigned
>         at 
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:92) 
> ~[guava-14.0.1.jar:na]
>         at 
> org.apache.drill.exec.store.parquet.ParquetGroupScan.getSpecificScan(ParquetGroupScan.java:707)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.store.parquet.ParquetGroupScan.getSpecificScan(ParquetGroupScan.java:105)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.Materializer.visitGroupScan(Materializer.java:68)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.Materializer.visitGroupScan(Materializer.java:35)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.physical.base.AbstractGroupScan.accept(AbstractGroupScan.java:60)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.Materializer.visitOp(Materializer.java:102)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.Materializer.visitOp(Materializer.java:35)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.physical.base.AbstractPhysicalVisitor.visitProject(AbstractPhysicalVisitor.java:77)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.physical.config.Project.accept(Project.java:51) 
> ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.Materializer.visitStore(Materializer.java:82)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.Materializer.visitStore(Materializer.java:35)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.physical.base.AbstractPhysicalVisitor.visitScreen(AbstractPhysicalVisitor.java:195)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.physical.config.Screen.accept(Screen.java:97) 
> ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.SimpleParallelizer.generateWorkUnit(SimpleParallelizer.java:355)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.planner.fragment.SimpleParallelizer.getFragments(SimpleParallelizer.java:134)
>  ~[drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.work.foreman.Foreman.getQueryWorkUnit(Foreman.java:518) 
> [drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.work.foreman.Foreman.runPhysicalPlan(Foreman.java:405) 
> [drill-java-exec-1.5.0.jar:1.5.0]
>         at 
> org.apache.drill.exec.work.foreman.Foreman.runSQL(Foreman.java:926) 
> [drill-java-exec-1.5.0.jar:1.5.0]
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to