http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java index f1fb1e9..3ccdad0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.parquet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -26,12 +25,9 @@ import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.physical.base.AbstractBase; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.StoragePluginRegistry; import com.fasterxml.jackson.annotation.JacksonInject; @@ -40,31 +36,26 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterators; +import org.apache.hadoop.conf.Configuration; -// Class containing information for reading a single parquet row group form HDFS +// Class containing information for reading a single parquet row group from HDFS @JsonTypeName("parquet-row-group-scan") -public class ParquetRowGroupScan extends AbstractBase implements SubScan { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class); +public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class); - private final ParquetFormatConfig formatConfig; private final ParquetFormatPlugin formatPlugin; - private final List<RowGroupReadEntry> rowGroupReadEntries; - private final List<SchemaPath> columns; - private LogicalExpression filter; - private String selectionRoot; + private final ParquetFormatConfig formatConfig; + private final String selectionRoot; @JsonCreator - public ParquetRowGroupScan( // - @JacksonInject StoragePluginRegistry registry, // - @JsonProperty("userName") String userName, // - @JsonProperty("storage") StoragePluginConfig storageConfig, // - @JsonProperty("format") FormatPluginConfig formatConfig, // - @JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, // - @JsonProperty("columns") List<SchemaPath> columns, // - @JsonProperty("selectionRoot") String selectionRoot, // - @JsonProperty("filter") LogicalExpression filter - ) throws ExecutionSetupException { + public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("userName") String userName, + @JsonProperty("storageConfig") StoragePluginConfig storageConfig, + @JsonProperty("formatConfig") FormatPluginConfig formatConfig, + @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("selectionRoot") String selectionRoot, + @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException { this(userName, (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)), rowGroupReadEntries, @@ -73,82 +64,72 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan { filter); } - public ParquetRowGroupScan( // - String userName, // - ParquetFormatPlugin formatPlugin, // - List<RowGroupReadEntry> rowGroupReadEntries, // - List<SchemaPath> columns, // - String selectionRoot, // - LogicalExpression filter - ) { - super(userName); + public ParquetRowGroupScan(String userName, + ParquetFormatPlugin formatPlugin, + List<RowGroupReadEntry> rowGroupReadEntries, + List<SchemaPath> columns, + String selectionRoot, + LogicalExpression filter) { + super(userName, rowGroupReadEntries, columns, filter); this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration"); this.formatConfig = formatPlugin.getConfig(); - this.rowGroupReadEntries = rowGroupReadEntries; - this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns; this.selectionRoot = selectionRoot; - this.filter = filter; - } - - @JsonProperty("entries") - public List<RowGroupReadEntry> getRowGroupReadEntries() { - return rowGroupReadEntries; } - @JsonProperty("storage") - public StoragePluginConfig getEngineConfig() { + @JsonProperty + public StoragePluginConfig getStorageConfig() { return formatPlugin.getStorageConfig(); } - /** - * @return Parquet plugin format config - */ - @JsonProperty("format") + @JsonProperty public ParquetFormatConfig getFormatConfig() { return formatConfig; } + @JsonProperty public String getSelectionRoot() { return selectionRoot; } - @Override - public boolean isExecutable() { - return false; - } - @JsonIgnore public ParquetFormatPlugin getStorageEngine() { return formatPlugin; } @Override - public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { - return physicalVisitor.visitSubScan(this, value); + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter); } @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { - Preconditions.checkArgument(children.isEmpty()); - return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter); + public int getOperatorType() { + return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE; } @Override - public Iterator<PhysicalOperator> iterator() { - return Iterators.emptyIterator(); + public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) { + return new ParquetRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, selectionRoot, filter); } - public List<SchemaPath> getColumns() { - return columns; + @Override + public boolean areCorruptDatesAutoCorrected() { + return formatConfig.areCorruptDatesAutoCorrected(); } - public LogicalExpression getFilter() { - return filter; + @Override + public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) { + return formatPlugin.getFsConf(); } @Override - public int getOperatorType() { - return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE; + public boolean supportsFileImplicitColumns() { + return selectionRoot != null; } + @Override + public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) { + return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot); + } } +
http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index e928ebb..c71e3c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -17,15 +17,7 @@ */ package org.apache.drill.exec.store.parquet; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Functions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Maps; +import com.google.common.base.Preconditions; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.ExecutorFragmentContext; @@ -33,144 +25,52 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.store.ColumnExplorer; -import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; -import org.apache.drill.exec.store.parquet2.DrillParquetReader; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{ - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class); +import java.io.IOException; +import java.util.List; - private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read"; - private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total"; - private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read"; +public class ParquetScanBatchCreator extends AbstractParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan> { - @SuppressWarnings("resource") @Override - public ScanBatch getBatch(ExecutorFragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) - throws ExecutionSetupException { + public ScanBatch getBatch(ExecutorFragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); OperatorContext oContext = context.newOperatorContext(rowGroupScan); + return getBatch(context, rowGroupScan, oContext); + } - final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns()); - - if (!columnExplorer.isStarQuery()) { - rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(), - rowGroupScan.getRowGroupReadEntries(), columnExplorer.getTableColumns(), rowGroupScan.getSelectionRoot(), - rowGroupScan.getFilter()); - rowGroupScan.setOperatorId(rowGroupScan.getOperatorId()); - } - - DrillFileSystem fs; - try { - boolean useAsyncPageReader = - context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val; - if (useAsyncPageReader) { - fs = oContext.newNonTrackingFileSystem(rowGroupScan.getStorageEngine().getFsConf()); - } else { - fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf()); - } - } catch (IOException e) { - throw new ExecutionSetupException( - String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e); - } - - Configuration conf = new Configuration(fs.getConf()); - conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false); - conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false); - conf.setBoolean(ENABLE_TIME_READ_COUNTER, false); + @Override + protected AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager) { + return new ParquetDrillFileSystemManager(operatorContext, optionManager.getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val); + } - // keep footers in a map to avoid re-reading them - Map<String, ParquetMetadata> footers = Maps.newHashMap(); - List<RecordReader> readers = new LinkedList<>(); - List<Map<String, String>> implicitColumns = Lists.newArrayList(); - Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); - for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){ - /* - Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file - TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine) - we should add more information to the RowGroupInfo that will be populated upon the first read to - provide the reader with all of th file meta-data it needs - These fields will be added to the constructor below - */ - try { - Stopwatch timer = Stopwatch.createUnstarted(); - if (!footers.containsKey(e.getPath())){ - timer.start(); - ParquetMetadata footer = ParquetFileReader.readFooter(conf, new Path(e.getPath())); - long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); - logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", e.getPath(), "", 0, 0, 0, timeToRead); - footers.put(e.getPath(), footer ); - } - boolean autoCorrectCorruptDates = rowGroupScan.getFormatConfig().areCorruptDatesAutoCorrected(); - ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), rowGroupScan.getColumns(), - autoCorrectCorruptDates); - if (logger.isDebugEnabled()) { - logger.debug(containsCorruptDates.toString()); - } - if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && !isComplex(footers.get(e.getPath()))) { - readers.add( - new ParquetRecordReader( - context, e.getPath(), e.getRowGroupIndex(), e.getNumRecordsToRead(), fs, - CodecFactory.createDirectCodecFactory( - fs.getConf(), - new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0), - footers.get(e.getPath()), - rowGroupScan.getColumns(), - containsCorruptDates - ) - ); - } else { - ParquetMetadata footer = footers.get(e.getPath()); - readers.add(new DrillParquetReader(context, footer, e, columnExplorer.getTableColumns(), fs, containsCorruptDates)); - } - Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(e, rowGroupScan.getSelectionRoot()); - implicitColumns.add(implicitValues); - if (implicitValues.size() > mapWithMaxColumns.size()) { - mapWithMaxColumns = implicitValues; - } + /** + * Creates file system only if it was not created before, otherwise returns already created instance. + */ + private class ParquetDrillFileSystemManager extends AbstractDrillFileSystemManager { - } catch (IOException e1) { - throw new ExecutionSetupException(e1); - } - } + private final boolean useAsyncPageReader; + private DrillFileSystem fs; - // all readers should have the same number of implicit columns, add missing ones with value null - Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); - for (Map<String, String> map : implicitColumns) { - map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); + ParquetDrillFileSystemManager(OperatorContext operatorContext, boolean useAsyncPageReader) { + super(operatorContext); + this.useAsyncPageReader = useAsyncPageReader; } - return new ScanBatch(context, oContext, readers, implicitColumns); - } - - private static boolean isComplex(ParquetMetadata footer) { - MessageType schema = footer.getFileMetaData().getSchema(); - - for (Type type : schema.getFields()) { - if (!type.isPrimitive()) { - return true; - } - } - for (ColumnDescriptor col : schema.getColumns()) { - if (col.getMaxRepetitionLevel() > 0) { - return true; + @Override + protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException { + if (fs == null) { + try { + fs = useAsyncPageReader ? operatorContext.newNonTrackingFileSystem(config) : operatorContext.newFileSystem(config); + } catch (IOException e) { + throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e); + } } + return fs; } - return false; } } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java new file mode 100644 index 0000000..af436d8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; +import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.drill.exec.store.schedule.CompleteWork; +import org.apache.drill.exec.store.schedule.EndpointByteMap; + +import java.util.List; + +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata; + +public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork { + + private EndpointByteMap byteMap; + private int rowGroupIndex; + private List<? extends ColumnMetadata> columns; + private long rowCount; // rowCount = -1 indicates to include all rows. + private long numRecordsToRead; + + @JsonCreator + public RowGroupInfo(@JsonProperty("path") String path, + @JsonProperty("start") long start, + @JsonProperty("length") long length, + @JsonProperty("rowGroupIndex") int rowGroupIndex, + long rowCount) { + super(path, start, length); + this.rowGroupIndex = rowGroupIndex; + this.rowCount = rowCount; + this.numRecordsToRead = rowCount; + } + + public RowGroupReadEntry getRowGroupReadEntry() { + return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex, this.getNumRecordsToRead()); + } + + public int getRowGroupIndex() { + return this.rowGroupIndex; + } + + @Override + public int compareTo(CompleteWork o) { + return Long.compare(getTotalBytes(), o.getTotalBytes()); + } + + @Override + public long getTotalBytes() { + return this.getLength(); + } + + @Override + public EndpointByteMap getByteMap() { + return byteMap; + } + + public long getNumRecordsToRead() { + return numRecordsToRead; + } + + public void setNumRecordsToRead(long numRecords) { + numRecordsToRead = numRecords; + } + + public void setEndpointByteMap(EndpointByteMap byteMap) { + this.byteMap = byteMap; + } + + public long getRowCount() { + return rowCount; + } + + public List<? extends ColumnMetadata> getColumns() { + return columns; + } + + public void setColumns(List<? extends ColumnMetadata> columns) { + this.columns = columns; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java new file mode 100644 index 0000000..cdb28c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -0,0 +1,694 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.metadata; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator.Feature; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.util.DrillVersionInfo; +import org.apache.drill.exec.store.TimedRunnable; +import org.apache.drill.exec.store.dfs.MetadataContext; +import org.apache.drill.exec.store.parquet.ParquetFormatConfig; +import org.apache.drill.exec.store.parquet.ParquetReaderUtility; +import org.apache.drill.exec.util.DrillFileSystemUtil; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFileMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3; + +public class Metadata { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); + + public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"}; + public static final String METADATA_FILENAME = ".drill.parquet_metadata"; + public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories"; + + private final ParquetFormatConfig formatConfig; + + private ParquetTableMetadataBase parquetTableMetadata; + private ParquetTableMetadataDirs parquetTableMetadataDirs; + + + private Metadata(ParquetFormatConfig formatConfig) { + this.formatConfig = formatConfig; + } + + /** + * Create the parquet metadata file for the directory at the given path, and for any subdirectories. + * + * @param fs file system + * @param path path + */ + public static void createMeta(FileSystem fs, String path, ParquetFormatConfig formatConfig) throws IOException { + Metadata metadata = new Metadata(formatConfig); + metadata.createMetaFilesRecursively(path, fs); + } + + /** + * Get the parquet metadata for the parquet files in the given directory, including those in subdirectories. + * + * @param fs file system + * @param path path + * @return parquet table metadata + */ + public static ParquetTableMetadata_v3 getParquetTableMetadata(FileSystem fs, String path, ParquetFormatConfig formatConfig) + throws IOException { + Metadata metadata = new Metadata(formatConfig); + return metadata.getParquetTableMetadata(path, fs); + } + + /** + * Get the parquet metadata for a list of parquet files. + * + * @param fileStatusMap file statuses and corresponding file systems + * @param formatConfig parquet format config + * @return parquet table metadata + */ + public static ParquetTableMetadata_v3 getParquetTableMetadata(Map<FileStatus, FileSystem> fileStatusMap, + ParquetFormatConfig formatConfig) throws IOException { + Metadata metadata = new Metadata(formatConfig); + return metadata.getParquetTableMetadata(fileStatusMap); + } + + /** + * Get the parquet metadata for the table by reading the metadata file + * + * @param fs current file system + * @param path The path to the metadata file, located in the directory that contains the parquet files + * @param metaContext metadata context + * @param formatConfig parquet format plugin configs + * @return parquet table metadata. Null if metadata cache is missing, unsupported or corrupted + */ + public static @Nullable ParquetTableMetadataBase readBlockMeta(FileSystem fs, Path path, MetadataContext metaContext, + ParquetFormatConfig formatConfig) { + if (ignoreReadingMetadata(metaContext, path)) { + return null; + } + Metadata metadata = new Metadata(formatConfig); + metadata.readBlockMeta(path, false, metaContext, fs); + return metadata.parquetTableMetadata; + } + + /** + * Get the parquet metadata for all subdirectories by reading the metadata file + * + * @param fs current file system + * @param path The path to the metadata file, located in the directory that contains the parquet files + * @param metaContext metadata context + * @param formatConfig parquet format plugin configs + * @return parquet metadata for a directory. Null if metadata cache is missing, unsupported or corrupted + */ + public static @Nullable ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, Path path, + MetadataContext metaContext, ParquetFormatConfig formatConfig) { + if (ignoreReadingMetadata(metaContext, path)) { + return null; + } + Metadata metadata = new Metadata(formatConfig); + metadata.readBlockMeta(path, true, metaContext, fs); + return metadata.parquetTableMetadataDirs; + } + + /** + * Ignore reading metadata files, if metadata is missing, unsupported or corrupted + * + * @param metaContext Metadata context + * @param path The path to the metadata file, located in the directory that contains the parquet files + * @return true if parquet metadata is missing or corrupted, false otherwise + */ + private static boolean ignoreReadingMetadata(MetadataContext metaContext, Path path) { + if (metaContext.isMetadataCacheCorrupted()) { + logger.warn("Ignoring of reading '{}' metadata file. Parquet metadata cache files are unsupported or corrupted. " + + "Query performance may be slow. Make sure the cache files are up-to-date by running the 'REFRESH TABLE " + + "METADATA' command", path); + return true; + } + return false; + } + + /** + * Create the parquet metadata files for the directory at the given path and for any subdirectories. + * Metadata cache files written to the disk contain relative paths. Returned Pair of metadata contains absolute paths. + * + * @param path to the directory of the parquet table + * @param fs file system + * @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is + * a metadata for all subdirectories (if they are present and there are no any parquet files in the + * {@code path} directory). + * @throws IOException if parquet metadata can't be serialized and written to the json file + */ + private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final String path, FileSystem fs) throws IOException { + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; + List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList(); + List<String> directoryList = Lists.newArrayList(); + ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet = + new ConcurrentHashMap<>(); + Path p = new Path(path); + FileStatus fileStatus = fs.getFileStatus(p); + assert fileStatus.isDirectory() : "Expected directory"; + + final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>(); + + for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) { + if (file.isDirectory()) { + ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString(), fs)).getLeft(); + metaDataList.addAll(subTableMetadata.files); + directoryList.addAll(subTableMetadata.directories); + directoryList.add(file.getPath().toString()); + // Merge the schema from the child level into the current level + //TODO: We need a merge method that merges two columns with the same name but different types + columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo); + } else { + childFiles.put(file, fs); + } + } + ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(), + DrillVersionInfo.getVersion()); + if (childFiles.size() > 0) { + List<ParquetFileMetadata_v3 > childFilesMetadata = getParquetFileMetadata_v3(parquetTableMetadata, childFiles); + metaDataList.addAll(childFilesMetadata); + // Note that we do not need to merge the columnInfo at this point. The columnInfo is already added + // to the parquetTableMetadata. + } + + parquetTableMetadata.directories = directoryList; + parquetTableMetadata.files = metaDataList; + // TODO: We need a merge method that merges two columns with the same name but different types + if (parquetTableMetadata.columnTypeInfo == null) { + parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>(); + } + parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet); + + for (String oldName : OLD_METADATA_FILENAMES) { + fs.delete(new Path(p, oldName), false); + } + // relative paths in the metadata are only necessary for meta cache files. + ParquetTableMetadata_v3 metadataTableWithRelativePaths = + MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path); + writeFile(metadataTableWithRelativePaths, new Path(p, METADATA_FILENAME), fs); + + if (directoryList.size() > 0 && childFiles.size() == 0) { + ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths = + new ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories); + writeFile(parquetTableMetadataDirsRelativePaths, new Path(p, METADATA_DIRECTORIES_FILENAME), fs); + if (timer != null) { + logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); + } + ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList); + return Pair.of(parquetTableMetadata, parquetTableMetadataDirs); + } + List<String> emptyDirList = Lists.newArrayList(); + if (timer != null) { + logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); + timer.stop(); + } + return Pair.of(parquetTableMetadata, new ParquetTableMetadataDirs(emptyDirList)); + } + + /** + * Get the parquet metadata for the parquet files in a directory. + * + * @param path the path of the directory + * @return metadata object for an entire parquet directory structure + * @throws IOException in case of problems during accessing files + */ + private ParquetTableMetadata_v3 getParquetTableMetadata(String path, FileSystem fs) throws IOException { + Path p = new Path(path); + FileStatus fileStatus = fs.getFileStatus(p); + Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; + List<FileStatus> fileStatuses = new ArrayList<>(); + if (fileStatus.isFile()) { + fileStatuses.add(fileStatus); + } else { + fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, p, true)); + } + if (watch != null) { + logger.debug("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS)); + watch.reset(); + watch.start(); + } + + Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream() + .collect( + Collectors.toMap( + Function.identity(), + s -> fs, + (oldFs, newFs) -> newFs, + LinkedHashMap::new)); + + ParquetTableMetadata_v3 metadata_v3 = getParquetTableMetadata(fileStatusMap); + if (watch != null) { + logger.debug("Took {} ms to read file metadata", watch.elapsed(TimeUnit.MILLISECONDS)); + watch.stop(); + } + return metadata_v3; + } + + /** + * Get the parquet metadata for a list of parquet files + * + * @param fileStatusMap file statuses and corresponding file systems + * @return parquet table metadata object + * @throws IOException if parquet file metadata can't be obtained + */ + private ParquetTableMetadata_v3 getParquetTableMetadata(Map<FileStatus, FileSystem> fileStatusMap) + throws IOException { + ParquetTableMetadata_v3 tableMetadata = new ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(), + DrillVersionInfo.getVersion()); + tableMetadata.files = getParquetFileMetadata_v3(tableMetadata, fileStatusMap); + tableMetadata.directories = new ArrayList<>(); + return tableMetadata; + } + + /** + * Get a list of file metadata for a list of parquet files + * + * @param parquetTableMetadata_v3 can store column schema info from all the files and row groups + * @param fileStatusMap parquet files statuses and corresponding file systems + * + * @return list of the parquet file metadata with absolute paths + * @throws IOException is thrown in case of issues while executing the list of runnables + */ + private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3( + ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException { + + List<TimedRunnable<ParquetFileMetadata_v3>> gatherers = fileStatusMap.entrySet().stream() + .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), e.getValue())) + .collect(Collectors.toList()); + + List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>(); + metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16)); + return metaDataList; + } + + /** + * TimedRunnable that reads the footer from parquet and collects file metadata + */ + private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v3> { + + private final ParquetTableMetadata_v3 parquetTableMetadata; + private final FileStatus fileStatus; + private final FileSystem fs; + + MetadataGatherer(ParquetTableMetadata_v3 parquetTableMetadata, FileStatus fileStatus, FileSystem fs) { + this.parquetTableMetadata = parquetTableMetadata; + this.fileStatus = fileStatus; + this.fs = fs; + } + + @Override + protected ParquetFileMetadata_v3 runInner() throws Exception { + return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs); + } + + @Override + protected IOException convertToIOException(Exception e) { + if (e instanceof IOException) { + return (IOException) e; + } else { + return new IOException(e); + } + } + } + + private ColTypeInfo getColTypeInfo(MessageType schema, Type type, String[] path, int depth) { + if (type.isPrimitive()) { + PrimitiveType primitiveType = (PrimitiveType) type; + int precision = 0; + int scale = 0; + if (primitiveType.getDecimalMetadata() != null) { + precision = primitiveType.getDecimalMetadata().getPrecision(); + scale = primitiveType.getDecimalMetadata().getScale(); + } + + int repetitionLevel = schema.getMaxRepetitionLevel(path); + int definitionLevel = schema.getMaxDefinitionLevel(path); + + return new ColTypeInfo(type.getOriginalType(), precision, scale, repetitionLevel, definitionLevel); + } + Type t = ((GroupType) type).getType(path[depth]); + return getColTypeInfo(schema, t, path, depth + 1); + } + + private class ColTypeInfo { + public OriginalType originalType; + public int precision; + public int scale; + public int repetitionLevel; + public int definitionLevel; + + ColTypeInfo(OriginalType originalType, int precision, int scale, int repetitionLevel, int definitionLevel) { + this.originalType = originalType; + this.precision = precision; + this.scale = scale; + this.repetitionLevel = repetitionLevel; + this.definitionLevel = definitionLevel; + } + } + + /** + * Get the metadata for a single file + */ + private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3 parquetTableMetadata, + final FileStatus file, final FileSystem fs) throws IOException, InterruptedException { + final ParquetMetadata metadata; + final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI(); + try { + metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>) + () -> ParquetFileReader.readFooter(fs.getConf(), file, ParquetMetadataConverter.NO_FILTER)); + } catch(Exception e) { + logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}", + file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e); + throw e; + } + + MessageType schema = metadata.getFileMetaData().getSchema(); + +// Map<SchemaPath, OriginalType> originalTypeMap = Maps.newHashMap(); + Map<SchemaPath, ColTypeInfo> colTypeInfoMap = Maps.newHashMap(); + schema.getPaths(); + for (String[] path : schema.getPaths()) { + colTypeInfoMap.put(SchemaPath.getCompoundPath(path), getColTypeInfo(schema, schema, path, 0)); + } + + List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList(); + + ArrayList<SchemaPath> ALL_COLS = new ArrayList<>(); + ALL_COLS.add(SchemaPath.STAR_COLUMN); + boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected(); + ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates); + if (logger.isDebugEnabled()) { + logger.debug(containsCorruptDates.toString()); + } + for (BlockMetaData rowGroup : metadata.getBlocks()) { + List<ColumnMetadata_v3> columnMetadataList = Lists.newArrayList(); + long length = 0; + for (ColumnChunkMetaData col : rowGroup.getColumns()) { + ColumnMetadata_v3 columnMetadata; + + boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty()); + + Statistics<?> stats = col.getStatistics(); + String[] columnName = col.getPath().toArray(); + SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName); + ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName); + + ColumnTypeMetadata_v3 columnTypeMetadata = + new ColumnTypeMetadata_v3(columnName, col.getType(), colTypeInfo.originalType, + colTypeInfo.precision, colTypeInfo.scale, colTypeInfo.repetitionLevel, colTypeInfo.definitionLevel); + + if (parquetTableMetadata.columnTypeInfo == null) { + parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>(); + } + // Save the column schema info. We'll merge it into one list + parquetTableMetadata.columnTypeInfo + .put(new ColumnTypeMetadata_v3.Key(columnTypeMetadata.name), columnTypeMetadata); + if (statsAvailable) { + // Write stats when they are not null + Object minValue = null; + Object maxValue = null; + if (stats.genericGetMax() != null && stats.genericGetMin() != null ) { + minValue = stats.genericGetMin(); + maxValue = stats.genericGetMax(); + if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION + && columnTypeMetadata.originalType == OriginalType.DATE) { + minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue); + maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue); + } + + } + columnMetadata = + new ColumnMetadata_v3(columnTypeMetadata.name, col.getType(), minValue, maxValue, stats.getNumNulls()); + } else { + columnMetadata = new ColumnMetadata_v3(columnTypeMetadata.name, col.getType(), null, null, null); + } + columnMetadataList.add(columnMetadata); + length += col.getTotalSize(); + } + + // DRILL-5009: Skip the RowGroup if it is empty + // Note we still read the schema even if there are no values in the RowGroup + if (rowGroup.getRowCount() == 0) { + continue; + } + RowGroupMetadata_v3 rowGroupMeta = + new RowGroupMetadata_v3(rowGroup.getStartingPos(), length, rowGroup.getRowCount(), + getHostAffinity(file, fs, rowGroup.getStartingPos(), length), columnMetadataList); + + rowGroupMetadataList.add(rowGroupMeta); + } + String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString(); + + return new ParquetFileMetadata_v3(path, file.getLen(), rowGroupMetadataList); + } + + /** + * Get the host affinity for a row group. + * + * @param fileStatus the parquet file + * @param start the start of the row group + * @param length the length of the row group + * @return host affinity for the row group + */ + private Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem fs, long start, long length) + throws IOException { + BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length); + Map<String, Float> hostAffinityMap = Maps.newHashMap(); + for (BlockLocation blockLocation : blockLocations) { + for (String host : blockLocation.getHosts()) { + Float currentAffinity = hostAffinityMap.get(host); + float blockStart = blockLocation.getOffset(); + float blockEnd = blockStart + blockLocation.getLength(); + float rowGroupEnd = start + length; + Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) - + (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length; + if (currentAffinity != null) { + hostAffinityMap.put(host, currentAffinity + newAffinity); + } else { + hostAffinityMap.put(host, newAffinity); + } + } + } + return hostAffinityMap; + } + + /** + * Serialize parquet metadata to json and write to a file. + * + * @param parquetTableMetadata parquet table metadata + * @param p file path + */ + private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p, FileSystem fs) throws IOException { + JsonFactory jsonFactory = new JsonFactory(); + jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false); + jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); + ObjectMapper mapper = new ObjectMapper(jsonFactory); + SimpleModule module = new SimpleModule(); + module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer()); + mapper.registerModule(module); + FSDataOutputStream os = fs.create(p); + mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata); + os.flush(); + os.close(); + } + + private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, Path p, FileSystem fs) throws IOException { + JsonFactory jsonFactory = new JsonFactory(); + jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false); + jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); + ObjectMapper mapper = new ObjectMapper(jsonFactory); + SimpleModule module = new SimpleModule(); + mapper.registerModule(module); + FSDataOutputStream os = fs.create(p); + mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs); + os.flush(); + os.close(); + } + + /** + * Read the parquet metadata from a file + * + * @param path to metadata file + * @param dirsOnly true for {@link Metadata#METADATA_DIRECTORIES_FILENAME} + * or false for {@link Metadata#METADATA_FILENAME} files reading + * @param metaContext current metadata context + */ + private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext metaContext, FileSystem fs) { + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; + Path metadataParentDir = Path.getPathWithoutSchemeAndAuthority(path.getParent()); + String metadataParentDirPath = metadataParentDir.toUri().getPath(); + ObjectMapper mapper = new ObjectMapper(); + + final SimpleModule serialModule = new SimpleModule(); + serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De()); + serialModule.addKeyDeserializer(Metadata_V2.ColumnTypeMetadata_v2.Key.class, new Metadata_V2.ColumnTypeMetadata_v2.Key.DeSerializer()); + serialModule.addKeyDeserializer(ColumnTypeMetadata_v3.Key.class, new ColumnTypeMetadata_v3.Key.DeSerializer()); + + AfterburnerModule module = new AfterburnerModule(); + module.setUseOptimizedBeanDeserializer(true); + + mapper.registerModule(serialModule); + mapper.registerModule(module); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + try (FSDataInputStream is = fs.open(path)) { + boolean alreadyCheckedModification; + boolean newMetadata = false; + alreadyCheckedModification = metaContext.getStatus(metadataParentDirPath); + + if (dirsOnly) { + parquetTableMetadataDirs = mapper.readValue(is, ParquetTableMetadataDirs.class); + if (timer != null) { + logger.debug("Took {} ms to read directories from directory cache file", timer.elapsed(TimeUnit.MILLISECONDS)); + timer.stop(); + } + parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath); + if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) { + parquetTableMetadataDirs = + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getRight(); + newMetadata = true; + } + } else { + parquetTableMetadata = mapper.readValue(is, ParquetTableMetadataBase.class); + if (timer != null) { + logger.debug("Took {} ms to read metadata from cache file", timer.elapsed(TimeUnit.MILLISECONDS)); + timer.stop(); + } + if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0) { + ((ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(metadataParentDirPath); + } + if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { + parquetTableMetadata = + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getLeft(); + newMetadata = true; + } + + // DRILL-5009: Remove the RowGroup if it is empty + List<? extends ParquetFileMetadata> files = parquetTableMetadata.getFiles(); + for (ParquetFileMetadata file : files) { + List<? extends RowGroupMetadata> rowGroups = file.getRowGroups(); + for (Iterator<? extends RowGroupMetadata> iter = rowGroups.iterator(); iter.hasNext(); ) { + RowGroupMetadata r = iter.next(); + if (r.getRowCount() == 0) { + iter.remove(); + } + } + } + + } + if (newMetadata) { + // if new metadata files were created, invalidate the existing metadata context + metaContext.clear(); + } + } catch (IOException e) { + logger.error("Failed to read '{}' metadata file", path, e); + metaContext.setMetadataCacheCorrupted(true); + } + } + + /** + * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with + * the modification time of the metadata file + * + * @param directories List of directories + * @param metaFilePath path of parquet metadata cache file + * @return true if metadata needs to be updated, false otherwise + * @throws IOException if some resources are not accessible + */ + private boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext, FileSystem fs) throws IOException { + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; + metaContext.setStatus(parentDir.toUri().getPath()); + long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime(); + FileStatus directoryStatus = fs.getFileStatus(parentDir); + int numDirs = 1; + if (directoryStatus.getModificationTime() > metaFileModifyTime) { + if (timer != null) { + logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", + directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs); + timer.stop(); + } + return true; + } + for (String directory : directories) { + numDirs++; + metaContext.setStatus(directory); + directoryStatus = fs.getFileStatus(new Path(directory)); + if (directoryStatus.getModificationTime() > metaFileModifyTime) { + if (timer != null) { + logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", + directoryStatus.getPath().toString(), timer.elapsed(TimeUnit.MILLISECONDS), numDirs); + timer.stop(); + } + return true; + } + } + if (timer != null) { + logger.debug("No directories were modified. Took {} ms to check modification time of {} directories", + timer.elapsed(TimeUnit.MILLISECONDS), numDirs); + timer.stop(); + } + return false; + } + +} + http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java new file mode 100644 index 0000000..d7d56c3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.metadata; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +import java.util.List; +import java.util.Map; + +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V1; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V2; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_1; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_2; +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_3; + +public class MetadataBase { + + /** + * Basic class for parquet metadata. Inheritors of this class are json serializable structures which contain + * different metadata versions for an entire parquet directory structure + * <p> + * If any new code changes affect on the metadata files content, please update metadata version in such manner: + * Bump up metadata major version if metadata structure is changed. + * Bump up metadata minor version if only metadata content is changed, but metadata structure is the same. + * <p> + * Note: keep metadata versions synchronized with {@link MetadataVersion.Constants} + */ + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "metadata_version", + visible = true) + @JsonSubTypes({ + @JsonSubTypes.Type(value = Metadata_V1.ParquetTableMetadata_v1.class, name = V1), + @JsonSubTypes.Type(value = Metadata_V2.ParquetTableMetadata_v2.class, name = V2), + @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, name = V3), + @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, name = V3_1), + @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, name = V3_2), + @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, name = V3_3) + }) + public static abstract class ParquetTableMetadataBase { + + @JsonIgnore + public abstract List<String> getDirectories(); + + @JsonIgnore public abstract List<? extends ParquetFileMetadata> getFiles(); + + @JsonIgnore public abstract void assignFiles(List<? extends ParquetFileMetadata> newFiles); + + public abstract boolean hasColumnMetadata(); + + @JsonIgnore public abstract PrimitiveType.PrimitiveTypeName getPrimitiveType(String[] columnName); + + @JsonIgnore public abstract OriginalType getOriginalType(String[] columnName); + + @JsonIgnore public abstract Integer getRepetitionLevel(String[] columnName); + + @JsonIgnore public abstract Integer getDefinitionLevel(String[] columnName); + + @JsonIgnore public abstract boolean isRowGroupPrunable(); + + @JsonIgnore public abstract ParquetTableMetadataBase clone(); + + @JsonIgnore public abstract String getDrillVersion(); + + @JsonIgnore public abstract String getMetadataVersion(); + } + + public static abstract class ParquetFileMetadata { + @JsonIgnore public abstract String getPath(); + + @JsonIgnore public abstract Long getLength(); + + @JsonIgnore public abstract List<? extends RowGroupMetadata> getRowGroups(); + } + + + public static abstract class RowGroupMetadata { + @JsonIgnore public abstract Long getStart(); + + @JsonIgnore public abstract Long getLength(); + + @JsonIgnore public abstract Long getRowCount(); + + @JsonIgnore public abstract Map<String, Float> getHostAffinity(); + + @JsonIgnore public abstract List<? extends ColumnMetadata> getColumns(); + } + + + public static abstract class ColumnMetadata { + public abstract String[] getName(); + + public abstract Long getNulls(); + + public abstract boolean hasSingleValue(long rowCount); + + public abstract Object getMinValue(); + + public abstract Object getMaxValue(); + + /** + * Set the max value recorded in the parquet metadata statistics. + * + * This object would just be immutable, but due to Drill-4203 we need to correct + * date values that had been corrupted by earlier versions of Drill. + */ + public abstract void setMax(Object newMax); + + /** + * Set the min value recorded in the parquet metadata statistics. + * + * This object would just be immutable, but due to Drill-4203 we need to correct + * date values that had been corrupted by earlier versions of Drill. + */ + public abstract void setMin(Object newMax); + + public abstract PrimitiveType.PrimitiveTypeName getPrimitiveType(); + + public abstract OriginalType getOriginalType(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java new file mode 100644 index 0000000..b9480e8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.metadata; + +import com.google.common.collect.Lists; +import org.apache.drill.common.util.DrillVersionInfo; +import org.apache.hadoop.fs.Path; + +import java.util.List; + +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFileMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3; + +/** + * Util class that contains helper methods for converting paths in the table and directory metadata structures + */ +public class MetadataPathUtils { + + /** + * Helper method that converts a list of relative paths to absolute ones + * + * @param paths list of relative paths + * @param baseDir base parent directory + * @return list of absolute paths + */ + public static List<String> convertToAbsolutePaths(List<String> paths, String baseDir) { + if (!paths.isEmpty()) { + List<String> absolutePaths = Lists.newArrayList(); + for (String relativePath : paths) { + String absolutePath = (new Path(relativePath).isAbsolute()) ? relativePath + : new Path(baseDir, relativePath).toUri().getPath(); + absolutePaths.add(absolutePath); + } + return absolutePaths; + } + return paths; + } + + /** + * Convert a list of files with relative paths to files with absolute ones + * + * @param files list of files with relative paths + * @param baseDir base parent directory + * @return list of files with absolute paths + */ + public static List<ParquetFileMetadata_v3> convertToFilesWithAbsolutePaths( + List<ParquetFileMetadata_v3> files, String baseDir) { + if (!files.isEmpty()) { + List<ParquetFileMetadata_v3> filesWithAbsolutePaths = Lists.newArrayList(); + for (ParquetFileMetadata_v3 file : files) { + Path relativePath = new Path(file.getPath()); + // create a new file if old one contains a relative path, otherwise use an old file + ParquetFileMetadata_v3 fileWithAbsolutePath = (relativePath.isAbsolute()) ? file + : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().getPath(), file.length, file.rowGroups); + filesWithAbsolutePaths.add(fileWithAbsolutePath); + } + return filesWithAbsolutePaths; + } + return files; + } + + /** + * Creates a new parquet table metadata from the {@code tableMetadataWithAbsolutePaths} parquet table. + * A new parquet table will contain relative paths for the files and directories. + * + * @param tableMetadataWithAbsolutePaths parquet table metadata with absolute paths for the files and directories + * @param baseDir base parent directory + * @return parquet table metadata with relative paths for the files and directories + */ + public static ParquetTableMetadata_v3 createMetadataWithRelativePaths( + ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, String baseDir) { + List<String> directoriesWithRelativePaths = Lists.newArrayList(); + for (String directory : tableMetadataWithAbsolutePaths.getDirectories()) { + directoriesWithRelativePaths.add(relativize(baseDir, directory)) ; + } + List<ParquetFileMetadata_v3> filesWithRelativePaths = Lists.newArrayList(); + for (ParquetFileMetadata_v3 file : tableMetadataWithAbsolutePaths.files) { + filesWithRelativePaths.add(new ParquetFileMetadata_v3( + relativize(baseDir, file.getPath()), file.length, file.rowGroups)); + } + return new ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(), tableMetadataWithAbsolutePaths, + filesWithRelativePaths, directoriesWithRelativePaths, DrillVersionInfo.getVersion()); + } + + /** + * Constructs relative path from child full path and base path. Or return child path if the last one is already relative + * + * @param childPath full absolute path + * @param baseDir base path (the part of the Path, which should be cut off from child path) + * @return relative path + */ + public static String relativize(String baseDir, String childPath) { + Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(childPath)); + Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(baseDir)); + + // Since hadoop Path hasn't relativize() we use uri.relativize() to get relative path + Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri() + .relativize(fullPathWithoutSchemeAndAuthority.toUri())); + if (relativeFilePath.isAbsolute()) { + throw new IllegalStateException(String.format("Path %s is not a subpath of %s.", + basePathWithoutSchemeAndAuthority.toUri().getPath(), fullPathWithoutSchemeAndAuthority.toUri().getPath())); + } + return relativeFilePath.toUri().getPath(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java new file mode 100644 index 0000000..15b4b9d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.metadata; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableSortedSet; +import org.apache.drill.common.exceptions.DrillRuntimeException; + +import java.util.SortedSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MetadataVersion implements Comparable<MetadataVersion> { + + private static final String FORMAT = "v?((?!0)\\d+)(\\.(\\d+))?"; + private static final Pattern PATTERN = Pattern.compile(FORMAT); + + private final int major; + private final int minor; + + public MetadataVersion(int major, int minor) { + this.major = major; + this.minor = minor; + } + + public MetadataVersion(String metadataVersion) { + Matcher matcher = PATTERN.matcher(metadataVersion); + if (!matcher.matches()) { + DrillRuntimeException.format("Could not parse metadata version '%s' using format '%s'", metadataVersion, FORMAT); + } + this.major = Integer.parseInt(matcher.group(1)); + this.minor = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 0; + } + + public int getMajor() { + return major; + } + + public int getMinor() { + return minor; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MetadataVersion)) { + return false; + } + MetadataVersion that = (MetadataVersion) o; + return this.major == that.major + && this.minor == that.minor; + } + + @Override + public int hashCode() { + int result = major; + result = 31 * result + minor; + return result; + } + + /** + * @return string representation of the metadata file version, for example: "1", "10", "4.13" + * <p> + * String metadata version consists of the following characters:<p> + * major metadata version (any number of digits, except a single zero digit),<p> + * optional "." delimiter (used if minor metadata version is specified),<p> + * minor metadata version (not specified for "0" minor version) + */ + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(major); + if (minor != 0) { + builder.append(".").append(minor); + } + return builder.toString(); + } + + @Override + public int compareTo(MetadataVersion o) { + Preconditions.checkNotNull(o); + return ComparisonChain.start() + .compare(this.major, o.major) + .compare(this.minor, o.minor) + .result(); + } + +/** + * Supported metadata versions. + * <p> + * Note: keep them synchronized with {@link Metadata.ParquetTableMetadataBase} versions + */ + public static class Constants { + /** + * Version 1: Introduces parquet file metadata caching.<br> + * See DRILL-2743 + */ + public static final String V1 = "v1"; + /** + * Version 2: Metadata cache file size is reduced.<br> + * See DRILL-4053 + */ + public static final String V2 = "v2"; + /** + * Version 3: Difference between v3 and v2 : min/max, type_length, precision, scale, repetitionLevel, definitionLevel.<br> + * Filter pushdown for Parquet is implemented. <br> + * See DRILL-1950 + */ + public static final String V3 = "v3"; + /** + * Version 3.1: Absolute paths of files and directories are replaced with relative ones. Metadata version value + * doesn't contain `v` letter<br> + * See DRILL-3867, DRILL-5660 + */ + public static final String V3_1 = "3.1"; + + /** + * Version 3.2: An array with the components of the field name in + * {@link Metadata.ColumnTypeMetadata_v3.Key} class is replaced by the SchemaPath.<br> + * See DRILL-4264 + */ + public static final String V3_2 = "3.2"; + + /** + * Version 3.3: Changed serialization of BINARY and FIXED_LEN_BYTE_ARRAY fields.<br> + * See DRILL-4139 + */ + public static final String V3_3 = "3.3"; + + /** + * All historical versions of the Drill metadata cache files. In case of introducing a new parquet metadata version + * please follow the {@link MetadataVersion#FORMAT}. + */ + public static final SortedSet<MetadataVersion> SUPPORTED_VERSIONS = ImmutableSortedSet.of( + new MetadataVersion(V1), + new MetadataVersion(V2), + new MetadataVersion(V3), + new MetadataVersion(V3_1), + new MetadataVersion(V3_2), + new MetadataVersion(V3_3) + ); + + /** + * @param metadataVersion string representation of the parquet metadata version + * @return true if metadata version is supported, false otherwise + */ + public static boolean isVersionSupported(String metadataVersion) { + return SUPPORTED_VERSIONS.contains(new MetadataVersion(metadataVersion)); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java new file mode 100644 index 0000000..92feb5f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.metadata; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V1; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata; + + +public class Metadata_V1 { + + @JsonTypeName(V1) + public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase { + @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion; + @JsonProperty + List<ParquetFileMetadata_v1> files; + @JsonProperty List<String> directories; + + public ParquetTableMetadata_v1() { + } + + public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<String> directories) { + this.metadataVersion = metadataVersion; + this.files = files; + this.directories = directories; + } + + @JsonIgnore + @Override public List<String> getDirectories() { + return directories; + } + + @JsonIgnore @Override public List<? extends ParquetFileMetadata> getFiles() { + return files; + } + + @JsonIgnore @Override public void assignFiles(List<? extends ParquetFileMetadata> newFiles) { + this.files = (List<ParquetFileMetadata_v1>) newFiles; + } + + @Override public boolean hasColumnMetadata() { + return false; + } + + @JsonIgnore @Override public PrimitiveType.PrimitiveTypeName getPrimitiveType(String[] columnName) { + return null; + } + + @JsonIgnore @Override public OriginalType getOriginalType(String[] columnName) { + return null; + } + + @JsonIgnore @Override + public Integer getRepetitionLevel(String[] columnName) { + return null; + } + + @JsonIgnore @Override + public Integer getDefinitionLevel(String[] columnName) { + return null; + } + + @JsonIgnore @Override + public boolean isRowGroupPrunable() { + return false; + } + + @JsonIgnore @Override public MetadataBase.ParquetTableMetadataBase clone() { + return new ParquetTableMetadata_v1(metadataVersion, files, directories); + } + + @JsonIgnore @Override + public String getDrillVersion() { + return null; + } + + @JsonIgnore @Override public String getMetadataVersion() { + return metadataVersion; + } + } + + + /** + * Struct which contains the metadata for a single parquet file + */ + public static class ParquetFileMetadata_v1 extends ParquetFileMetadata { + @JsonProperty + public String path; + @JsonProperty + public Long length; + @JsonProperty + public List<RowGroupMetadata_v1> rowGroups; + + public ParquetFileMetadata_v1() { + } + + public ParquetFileMetadata_v1(String path, Long length, List<RowGroupMetadata_v1> rowGroups) { + this.path = path; + this.length = length; + this.rowGroups = rowGroups; + } + + @Override + public String toString() { + return String.format("path: %s rowGroups: %s", path, rowGroups); + } + + @JsonIgnore @Override public String getPath() { + return path; + } + + @JsonIgnore @Override public Long getLength() { + return length; + } + + @JsonIgnore @Override public List<? extends RowGroupMetadata> getRowGroups() { + return rowGroups; + } + } + + + /** + * A struct that contains the metadata for a parquet row group + */ + public static class RowGroupMetadata_v1 extends RowGroupMetadata { + @JsonProperty + public Long start; + @JsonProperty + public Long length; + @JsonProperty + public Long rowCount; + @JsonProperty + public Map<String, Float> hostAffinity; + @JsonProperty + public List<ColumnMetadata_v1> columns; + + public RowGroupMetadata_v1() { + } + + public RowGroupMetadata_v1(Long start, Long length, Long rowCount, Map<String, Float> hostAffinity, + List<ColumnMetadata_v1> columns) { + this.start = start; + this.length = length; + this.rowCount = rowCount; + this.hostAffinity = hostAffinity; + this.columns = columns; + } + + @Override public Long getStart() { + return start; + } + + @Override public Long getLength() { + return length; + } + + @Override public Long getRowCount() { + return rowCount; + } + + @Override public Map<String, Float> getHostAffinity() { + return hostAffinity; + } + + @Override public List<? extends ColumnMetadata> getColumns() { + return columns; + } + } + + + /** + * A struct that contains the metadata for a column in a parquet file + */ + public static class ColumnMetadata_v1 extends ColumnMetadata { + @JsonProperty + public SchemaPath name; + @JsonProperty + public PrimitiveType.PrimitiveTypeName primitiveType; + @JsonProperty + public OriginalType originalType; + @JsonProperty + public Long nulls; + + // JsonProperty for these are associated with the getters and setters + public Object max; + public Object min; + + + public ColumnMetadata_v1() { + } + + public ColumnMetadata_v1(SchemaPath name, PrimitiveType.PrimitiveTypeName primitiveType, OriginalType originalType, + Object max, Object min, Long nulls) { + this.name = name; + this.primitiveType = primitiveType; + this.originalType = originalType; + this.max = max; + this.min = min; + this.nulls = nulls; + } + + @JsonProperty(value = "min") + public Object getMin() { + if (primitiveType == PrimitiveType.PrimitiveTypeName.BINARY && min != null) { + return new String(((Binary) min).getBytes()); + } + return min; + } + + @JsonProperty(value = "max") + public Object getMax() { + if (primitiveType == PrimitiveType.PrimitiveTypeName.BINARY && max != null) { + return new String(((Binary) max).getBytes()); + } + return max; + } + + @Override public PrimitiveType.PrimitiveTypeName getPrimitiveType() { + return primitiveType; + } + + @Override public OriginalType getOriginalType() { + return originalType; + } + + /** + * setter used during deserialization of the 'min' field of the metadata cache file. + * + * @param min + */ + @JsonProperty(value = "min") + public void setMin(Object min) { + this.min = min; + } + + /** + * setter used during deserialization of the 'max' field of the metadata cache file. + * + * @param max + */ + @JsonProperty(value = "max") + public void setMax(Object max) { + this.max = max; + } + + @Override public String[] getName() { + String[] s = new String[1]; + String nameString = name.toString(); + // Strip out the surrounding backticks. + s[0]=nameString.substring(1, nameString.length()-1); + return s; + } + + @Override public Long getNulls() { + return nulls; + } + + /** + * Checks that the column chunk has a single value. + * Returns {@code true} if {@code min} and {@code max} are the same but not null + * and nulls count is 0 or equal to the rows count. + * <p> + * Returns {@code true} if {@code min} and {@code max} are null and the number of null values + * in the column chunk is equal to the rows count. + * <p> + * Comparison of nulls and rows count is needed for the cases: + * <ul> + * <li>column with primitive type has single value and null values</li> + * + * <li>column <b>with primitive type</b> has only null values, min/max couldn't be null, + * but column has single value</li> + * </ul> + * + * @param rowCount rows count in column chunk + * @return true if column has single value + */ + @Override + public boolean hasSingleValue(long rowCount) { + if (nulls != null) { + if (min != null) { + // Objects.deepEquals() is used here, since min and max may be byte arrays + return Objects.deepEquals(min, max) && (nulls == 0 || nulls == rowCount); + } else { + return nulls == rowCount && max == null; + } + } + return false; + } + + @Override public Object getMinValue() { + return min; + } + + @Override public Object getMaxValue() { + return max; + } + + } + +}