http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index f1fb1e9..3ccdad0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -26,12 +25,9 @@ import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
@@ -40,31 +36,26 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
 
-// Class containing information for reading a single parquet row group form 
HDFS
+// Class containing information for reading a single parquet row group from 
HDFS
 @JsonTypeName("parquet-row-group-scan")
-public class ParquetRowGroupScan extends AbstractBase implements SubScan {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class);
+public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class);
 
-  private final ParquetFormatConfig formatConfig;
   private final ParquetFormatPlugin formatPlugin;
-  private final List<RowGroupReadEntry> rowGroupReadEntries;
-  private final List<SchemaPath> columns;
-  private LogicalExpression filter;
-  private String selectionRoot;
+  private final ParquetFormatConfig formatConfig;
+  private final String selectionRoot;
 
   @JsonCreator
-  public ParquetRowGroupScan( //
-      @JacksonInject StoragePluginRegistry registry, //
-      @JsonProperty("userName") String userName, //
-      @JsonProperty("storage") StoragePluginConfig storageConfig, //
-      @JsonProperty("format") FormatPluginConfig formatConfig, //
-      @JsonProperty("entries") LinkedList<RowGroupReadEntry> 
rowGroupReadEntries, //
-      @JsonProperty("columns") List<SchemaPath> columns, //
-      @JsonProperty("selectionRoot") String selectionRoot, //
-      @JsonProperty("filter") LogicalExpression filter
-  ) throws ExecutionSetupException {
+  public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
+                             @JsonProperty("userName") String userName,
+                             @JsonProperty("storageConfig") 
StoragePluginConfig storageConfig,
+                             @JsonProperty("formatConfig") FormatPluginConfig 
formatConfig,
+                             @JsonProperty("rowGroupReadEntries") 
LinkedList<RowGroupReadEntry> rowGroupReadEntries,
+                             @JsonProperty("columns") List<SchemaPath> columns,
+                             @JsonProperty("selectionRoot") String 
selectionRoot,
+                             @JsonProperty("filter") LogicalExpression filter) 
throws ExecutionSetupException {
     this(userName,
         (ParquetFormatPlugin) 
registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), 
Preconditions.checkNotNull(formatConfig)),
         rowGroupReadEntries,
@@ -73,82 +64,72 @@ public class ParquetRowGroupScan extends AbstractBase 
implements SubScan {
         filter);
   }
 
-  public ParquetRowGroupScan( //
-      String userName, //
-      ParquetFormatPlugin formatPlugin, //
-      List<RowGroupReadEntry> rowGroupReadEntries, //
-      List<SchemaPath> columns, //
-      String selectionRoot, //
-      LogicalExpression filter
-  ) {
-    super(userName);
+  public ParquetRowGroupScan(String userName,
+                             ParquetFormatPlugin formatPlugin,
+                             List<RowGroupReadEntry> rowGroupReadEntries,
+                             List<SchemaPath> columns,
+                             String selectionRoot,
+                             LogicalExpression filter) {
+    super(userName, rowGroupReadEntries, columns, filter);
     this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not 
find format config for the given configuration");
     this.formatConfig = formatPlugin.getConfig();
-    this.rowGroupReadEntries = rowGroupReadEntries;
-    this.columns = columns == null ? GroupScan.ALL_COLUMNS : columns;
     this.selectionRoot = selectionRoot;
-    this.filter = filter;
-  }
-
-  @JsonProperty("entries")
-  public List<RowGroupReadEntry> getRowGroupReadEntries() {
-    return rowGroupReadEntries;
   }
 
-  @JsonProperty("storage")
-  public StoragePluginConfig getEngineConfig() {
+  @JsonProperty
+  public StoragePluginConfig getStorageConfig() {
     return formatPlugin.getStorageConfig();
   }
 
-  /**
-   * @return Parquet plugin format config
-   */
-  @JsonProperty("format")
+  @JsonProperty
   public ParquetFormatConfig getFormatConfig() {
     return formatConfig;
   }
 
+  @JsonProperty
   public String getSelectionRoot() {
     return selectionRoot;
   }
 
-  @Override
-  public boolean isExecutable() {
-    return false;
-  }
-
   @JsonIgnore
   public ParquetFormatPlugin getStorageEngine() {
     return formatPlugin;
   }
 
   @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
-    return physicalVisitor.visitSubScan(this, value);
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
rowGroupReadEntries, columns, selectionRoot, filter);
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
-    Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
rowGroupReadEntries, columns, selectionRoot, filter);
+  public int getOperatorType() {
+    return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE;
   }
 
   @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
+  public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
+    return new ParquetRowGroupScan(getUserName(), formatPlugin, 
rowGroupReadEntries, columns, selectionRoot, filter);
   }
 
-  public List<SchemaPath> getColumns() {
-    return columns;
+  @Override
+  public boolean areCorruptDatesAutoCorrected() {
+    return formatConfig.areCorruptDatesAutoCorrected();
   }
 
-  public LogicalExpression getFilter() {
-    return filter;
+  @Override
+  public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) {
+    return formatPlugin.getFsConf();
   }
 
   @Override
-  public int getOperatorType() {
-    return CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE;
+  public boolean supportsFileImplicitColumns() {
+    return selectionRoot != null;
   }
 
+  @Override
+  public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
+    return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), 
selectionRoot);
+  }
 }
+

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index e928ebb..c71e3c3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -17,15 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Maps;
+import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
@@ -33,144 +25,52 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.ColumnExplorer;
-import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
-import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan>{
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
+import java.io.IOException;
+import java.util.List;
 
-  private static final String ENABLE_BYTES_READ_COUNTER = 
"parquet.benchmark.bytes.read";
-  private static final String ENABLE_BYTES_TOTAL_COUNTER = 
"parquet.benchmark.bytes.total";
-  private static final String ENABLE_TIME_READ_COUNTER = 
"parquet.benchmark.time.read";
+public class ParquetScanBatchCreator extends AbstractParquetScanBatchCreator 
implements BatchCreator<ParquetRowGroupScan> {
 
-  @SuppressWarnings("resource")
   @Override
-  public ScanBatch getBatch(ExecutorFragmentContext context, 
ParquetRowGroupScan rowGroupScan, List<RecordBatch> children)
-      throws ExecutionSetupException {
+  public ScanBatch getBatch(ExecutorFragmentContext context, 
ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws 
ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     OperatorContext oContext = context.newOperatorContext(rowGroupScan);
+    return getBatch(context, rowGroupScan, oContext);
+  }
 
-    final ColumnExplorer columnExplorer = new 
ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
-
-    if (!columnExplorer.isStarQuery()) {
-      rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), 
rowGroupScan.getStorageEngine(),
-          rowGroupScan.getRowGroupReadEntries(), 
columnExplorer.getTableColumns(), rowGroupScan.getSelectionRoot(),
-          rowGroupScan.getFilter());
-      rowGroupScan.setOperatorId(rowGroupScan.getOperatorId());
-    }
-
-    DrillFileSystem fs;
-    try {
-      boolean useAsyncPageReader =
-          
context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
-      if (useAsyncPageReader) {
-        fs = 
oContext.newNonTrackingFileSystem(rowGroupScan.getStorageEngine().getFsConf());
-      } else {
-        fs = 
oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
-      }
-    } catch (IOException e) {
-      throw new ExecutionSetupException(
-          String.format("Failed to create DrillFileSystem: %s", 
e.getMessage()), e);
-    }
-
-    Configuration conf = new Configuration(fs.getConf());
-    conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
-    conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
-    conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
+  @Override
+  protected AbstractDrillFileSystemManager 
getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager 
optionManager) {
+    return new ParquetDrillFileSystemManager(operatorContext, 
optionManager.getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val);
+  }
 
-    // keep footers in a map to avoid re-reading them
-    Map<String, ParquetMetadata> footers = Maps.newHashMap();
-    List<RecordReader> readers = new LinkedList<>();
-    List<Map<String, String>> implicitColumns = Lists.newArrayList();
-    Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
-    for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
-      /*
-      Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
-      TODO - to prevent reading the footer again in the parquet record reader 
(it is read earlier in the ParquetStorageEngine)
-      we should add more information to the RowGroupInfo that will be 
populated upon the first read to
-      provide the reader with all of th file meta-data it needs
-      These fields will be added to the constructor below
-      */
-      try {
-        Stopwatch timer = Stopwatch.createUnstarted();
-        if (!footers.containsKey(e.getPath())){
-          timer.start();
-          ParquetMetadata footer = ParquetFileReader.readFooter(conf, new 
Path(e.getPath()));
-          long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
-          logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", 
e.getPath(), "", 0, 0, 0, timeToRead);
-          footers.put(e.getPath(), footer );
-        }
-        boolean autoCorrectCorruptDates = 
rowGroupScan.getFormatConfig().areCorruptDatesAutoCorrected();
-        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = 
ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), 
rowGroupScan.getColumns(),
-                autoCorrectCorruptDates);
-        if (logger.isDebugEnabled()) {
-          logger.debug(containsCorruptDates.toString());
-        }
-        if 
(!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && 
!isComplex(footers.get(e.getPath()))) {
-          readers.add(
-              new ParquetRecordReader(
-                  context, e.getPath(), e.getRowGroupIndex(), 
e.getNumRecordsToRead(), fs,
-                  CodecFactory.createDirectCodecFactory(
-                  fs.getConf(),
-                  new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
-                  footers.get(e.getPath()),
-                  rowGroupScan.getColumns(),
-                  containsCorruptDates
-              )
-          );
-        } else {
-          ParquetMetadata footer = footers.get(e.getPath());
-          readers.add(new DrillParquetReader(context, footer, e, 
columnExplorer.getTableColumns(), fs, containsCorruptDates));
-        }
 
-        Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(e, rowGroupScan.getSelectionRoot());
-        implicitColumns.add(implicitValues);
-        if (implicitValues.size() > mapWithMaxColumns.size()) {
-          mapWithMaxColumns = implicitValues;
-        }
+  /**
+   * Creates file system only if it was not created before, otherwise returns 
already created instance.
+   */
+  private class ParquetDrillFileSystemManager extends 
AbstractDrillFileSystemManager {
 
-      } catch (IOException e1) {
-        throw new ExecutionSetupException(e1);
-      }
-    }
+    private final boolean useAsyncPageReader;
+    private DrillFileSystem fs;
 
-    // all readers should have the same number of implicit columns, add 
missing ones with value null
-    Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, 
Functions.constant((String) null));
-    for (Map<String, String> map : implicitColumns) {
-      map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+    ParquetDrillFileSystemManager(OperatorContext operatorContext, boolean 
useAsyncPageReader) {
+      super(operatorContext);
+      this.useAsyncPageReader = useAsyncPageReader;
     }
 
-    return new ScanBatch(context, oContext, readers, implicitColumns);
-  }
-
-  private static boolean isComplex(ParquetMetadata footer) {
-    MessageType schema = footer.getFileMetaData().getSchema();
-
-    for (Type type : schema.getFields()) {
-      if (!type.isPrimitive()) {
-        return true;
-      }
-    }
-    for (ColumnDescriptor col : schema.getColumns()) {
-      if (col.getMaxRepetitionLevel() > 0) {
-        return true;
+    @Override
+    protected DrillFileSystem get(Configuration config, String path) throws 
ExecutionSetupException {
+      if (fs == null) {
+        try {
+          fs =  useAsyncPageReader ? 
operatorContext.newNonTrackingFileSystem(config) : 
operatorContext.newFileSystem(config);
+        } catch (IOException e) {
+          throw new ExecutionSetupException(String.format("Failed to create 
DrillFileSystem: %s", e.getMessage()), e);
+        }
       }
+      return fs;
     }
-    return false;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
new file mode 100644
index 0000000..af436d8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+
+import java.util.List;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
+
+public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, 
FileWork {
+
+  private EndpointByteMap byteMap;
+  private int rowGroupIndex;
+  private List<? extends ColumnMetadata> columns;
+  private long rowCount;  // rowCount = -1 indicates to include all rows.
+  private long numRecordsToRead;
+
+  @JsonCreator
+  public RowGroupInfo(@JsonProperty("path") String path,
+                      @JsonProperty("start") long start,
+                      @JsonProperty("length") long length,
+                      @JsonProperty("rowGroupIndex") int rowGroupIndex,
+                      long rowCount) {
+    super(path, start, length);
+    this.rowGroupIndex = rowGroupIndex;
+    this.rowCount = rowCount;
+    this.numRecordsToRead = rowCount;
+  }
+
+  public RowGroupReadEntry getRowGroupReadEntry() {
+    return new RowGroupReadEntry(this.getPath(), this.getStart(), 
this.getLength(), this.rowGroupIndex, this.getNumRecordsToRead());
+  }
+
+  public int getRowGroupIndex() {
+    return this.rowGroupIndex;
+  }
+
+  @Override
+  public int compareTo(CompleteWork o) {
+    return Long.compare(getTotalBytes(), o.getTotalBytes());
+  }
+
+  @Override
+  public long getTotalBytes() {
+    return this.getLength();
+  }
+
+  @Override
+  public EndpointByteMap getByteMap() {
+    return byteMap;
+  }
+
+  public long getNumRecordsToRead() {
+    return numRecordsToRead;
+  }
+
+  public void setNumRecordsToRead(long numRecords) {
+    numRecordsToRead = numRecords;
+  }
+
+  public void setEndpointByteMap(EndpointByteMap byteMap) {
+    this.byteMap = byteMap;
+  }
+
+  public long getRowCount() {
+    return rowCount;
+  }
+
+  public List<? extends ColumnMetadata> getColumns() {
+    return columns;
+  }
+
+  public void setColumns(List<? extends ColumnMetadata> columns) {
+    this.columns = columns;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
new file mode 100644
index 0000000..cdb28c2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -0,0 +1,694 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.metadata;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator.Feature;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.MetadataContext;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnMetadata_v3;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFileMetadata_v3;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3;
+
+public class Metadata {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(Metadata.class);
+
+  public static final String[] OLD_METADATA_FILENAMES = 
{".drill.parquet_metadata.v2"};
+  public static final String METADATA_FILENAME = ".drill.parquet_metadata";
+  public static final String METADATA_DIRECTORIES_FILENAME = 
".drill.parquet_metadata_directories";
+
+  private final ParquetFormatConfig formatConfig;
+
+  private ParquetTableMetadataBase parquetTableMetadata;
+  private ParquetTableMetadataDirs parquetTableMetadataDirs;
+
+
+  private Metadata(ParquetFormatConfig formatConfig) {
+    this.formatConfig = formatConfig;
+  }
+
+  /**
+   * Create the parquet metadata file for the directory at the given path, and 
for any subdirectories.
+   *
+   * @param fs file system
+   * @param path path
+   */
+  public static void createMeta(FileSystem fs, String path, 
ParquetFormatConfig formatConfig) throws IOException {
+    Metadata metadata = new Metadata(formatConfig);
+    metadata.createMetaFilesRecursively(path, fs);
+  }
+
+  /**
+   * Get the parquet metadata for the parquet files in the given directory, 
including those in subdirectories.
+   *
+   * @param fs file system
+   * @param path path
+   * @return parquet table metadata
+   */
+  public static ParquetTableMetadata_v3 getParquetTableMetadata(FileSystem fs, 
String path, ParquetFormatConfig formatConfig)
+      throws IOException {
+    Metadata metadata = new Metadata(formatConfig);
+    return metadata.getParquetTableMetadata(path, fs);
+  }
+
+  /**
+   * Get the parquet metadata for a list of parquet files.
+   *
+   * @param fileStatusMap file statuses and corresponding file systems
+   * @param formatConfig parquet format config
+   * @return parquet table metadata
+   */
+  public static ParquetTableMetadata_v3 
getParquetTableMetadata(Map<FileStatus, FileSystem> fileStatusMap,
+                                                                
ParquetFormatConfig formatConfig) throws IOException {
+    Metadata metadata = new Metadata(formatConfig);
+    return metadata.getParquetTableMetadata(fileStatusMap);
+  }
+
+  /**
+   * Get the parquet metadata for the table by reading the metadata file
+   *
+   * @param fs current file system
+   * @param path The path to the metadata file, located in the directory that 
contains the parquet files
+   * @param metaContext metadata context
+   * @param formatConfig parquet format plugin configs
+   * @return parquet table metadata. Null if metadata cache is missing, 
unsupported or corrupted
+   */
+  public static @Nullable ParquetTableMetadataBase readBlockMeta(FileSystem 
fs, Path path, MetadataContext metaContext,
+      ParquetFormatConfig formatConfig) {
+    if (ignoreReadingMetadata(metaContext, path)) {
+      return null;
+    }
+    Metadata metadata = new Metadata(formatConfig);
+    metadata.readBlockMeta(path, false, metaContext, fs);
+    return metadata.parquetTableMetadata;
+  }
+
+  /**
+   * Get the parquet metadata for all subdirectories by reading the metadata 
file
+   *
+   * @param fs current file system
+   * @param path The path to the metadata file, located in the directory that 
contains the parquet files
+   * @param metaContext metadata context
+   * @param formatConfig parquet format plugin configs
+   * @return parquet metadata for a directory. Null if metadata cache is 
missing, unsupported or corrupted
+   */
+  public static @Nullable ParquetTableMetadataDirs readMetadataDirs(FileSystem 
fs, Path path,
+      MetadataContext metaContext, ParquetFormatConfig formatConfig) {
+    if (ignoreReadingMetadata(metaContext, path)) {
+      return null;
+    }
+    Metadata metadata = new Metadata(formatConfig);
+    metadata.readBlockMeta(path, true, metaContext, fs);
+    return metadata.parquetTableMetadataDirs;
+  }
+
+  /**
+   * Ignore reading metadata files, if metadata is missing, unsupported or 
corrupted
+   *
+   * @param metaContext Metadata context
+   * @param path The path to the metadata file, located in the directory that 
contains the parquet files
+   * @return true if parquet metadata is missing or corrupted, false otherwise
+   */
+  private static boolean ignoreReadingMetadata(MetadataContext metaContext, 
Path path) {
+    if (metaContext.isMetadataCacheCorrupted()) {
+      logger.warn("Ignoring of reading '{}' metadata file. Parquet metadata 
cache files are unsupported or corrupted. " +
+          "Query performance may be slow. Make sure the cache files are 
up-to-date by running the 'REFRESH TABLE " +
+          "METADATA' command", path);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Create the parquet metadata files for the directory at the given path and 
for any subdirectories.
+   * Metadata cache files written to the disk contain relative paths. Returned 
Pair of metadata contains absolute paths.
+   *
+   * @param path to the directory of the parquet table
+   * @param fs file system
+   * @return Pair of parquet metadata. The left one is a parquet metadata for 
the table. The right one of the Pair is
+   *         a metadata for all subdirectories (if they are present and there 
are no any parquet files in the
+   *         {@code path} directory).
+   * @throws IOException if parquet metadata can't be serialized and written 
to the json file
+   */
+  private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> 
createMetaFilesRecursively(final String path, FileSystem fs) throws IOException 
{
+    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
+    List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList();
+    List<String> directoryList = Lists.newArrayList();
+    ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> 
columnTypeInfoSet =
+        new ConcurrentHashMap<>();
+    Path p = new Path(path);
+    FileStatus fileStatus = fs.getFileStatus(p);
+    assert fileStatus.isDirectory() : "Expected directory";
+
+    final Map<FileStatus, FileSystem> childFiles = new LinkedHashMap<>();
+
+    for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
+      if (file.isDirectory()) {
+        ParquetTableMetadata_v3 subTableMetadata = 
(createMetaFilesRecursively(file.getPath().toString(), fs)).getLeft();
+        metaDataList.addAll(subTableMetadata.files);
+        directoryList.addAll(subTableMetadata.directories);
+        directoryList.add(file.getPath().toString());
+        // Merge the schema from the child level into the current level
+        //TODO: We need a merge method that merges two columns with the same 
name but different types
+        columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo);
+      } else {
+        childFiles.put(file, fs);
+      }
+    }
+    ParquetTableMetadata_v3 parquetTableMetadata = new 
ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(),
+                                                                               
 DrillVersionInfo.getVersion());
+    if (childFiles.size() > 0) {
+      List<ParquetFileMetadata_v3 > childFilesMetadata = 
getParquetFileMetadata_v3(parquetTableMetadata, childFiles);
+      metaDataList.addAll(childFilesMetadata);
+      // Note that we do not need to merge the columnInfo at this point. The 
columnInfo is already added
+      // to the parquetTableMetadata.
+    }
+
+    parquetTableMetadata.directories = directoryList;
+    parquetTableMetadata.files = metaDataList;
+    // TODO: We need a merge method that merges two columns with the same name 
but different types
+    if (parquetTableMetadata.columnTypeInfo == null) {
+      parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>();
+    }
+    parquetTableMetadata.columnTypeInfo.putAll(columnTypeInfoSet);
+
+    for (String oldName : OLD_METADATA_FILENAMES) {
+      fs.delete(new Path(p, oldName), false);
+    }
+    //  relative paths in the metadata are only necessary for meta cache files.
+    ParquetTableMetadata_v3 metadataTableWithRelativePaths =
+        
MetadataPathUtils.createMetadataWithRelativePaths(parquetTableMetadata, path);
+    writeFile(metadataTableWithRelativePaths, new Path(p, METADATA_FILENAME), 
fs);
+
+    if (directoryList.size() > 0 && childFiles.size() == 0) {
+      ParquetTableMetadataDirs parquetTableMetadataDirsRelativePaths =
+          new 
ParquetTableMetadataDirs(metadataTableWithRelativePaths.directories);
+      writeFile(parquetTableMetadataDirsRelativePaths, new Path(p, 
METADATA_DIRECTORIES_FILENAME), fs);
+      if (timer != null) {
+        logger.debug("Creating metadata files recursively took {} ms", 
timer.elapsed(TimeUnit.MILLISECONDS));
+      }
+      ParquetTableMetadataDirs parquetTableMetadataDirs = new 
ParquetTableMetadataDirs(directoryList);
+      return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
+    }
+    List<String> emptyDirList = Lists.newArrayList();
+    if (timer != null) {
+      logger.debug("Creating metadata files recursively took {} ms", 
timer.elapsed(TimeUnit.MILLISECONDS));
+      timer.stop();
+    }
+    return Pair.of(parquetTableMetadata, new 
ParquetTableMetadataDirs(emptyDirList));
+  }
+
+  /**
+   * Get the parquet metadata for the parquet files in a directory.
+   *
+   * @param path the path of the directory
+   * @return metadata object for an entire parquet directory structure
+   * @throws IOException in case of problems during accessing files
+   */
+  private ParquetTableMetadata_v3 getParquetTableMetadata(String path, 
FileSystem fs) throws IOException {
+    Path p = new Path(path);
+    FileStatus fileStatus = fs.getFileStatus(p);
+    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    if (fileStatus.isFile()) {
+      fileStatuses.add(fileStatus);
+    } else {
+      fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, p, true));
+    }
+    if (watch != null) {
+      logger.debug("Took {} ms to get file statuses", 
watch.elapsed(TimeUnit.MILLISECONDS));
+      watch.reset();
+      watch.start();
+    }
+
+    Map<FileStatus, FileSystem> fileStatusMap = fileStatuses.stream()
+        .collect(
+            Collectors.toMap(
+                Function.identity(),
+                s -> fs,
+                (oldFs, newFs) -> newFs,
+                LinkedHashMap::new));
+
+    ParquetTableMetadata_v3 metadata_v3 = 
getParquetTableMetadata(fileStatusMap);
+    if (watch != null) {
+      logger.debug("Took {} ms to read file metadata", 
watch.elapsed(TimeUnit.MILLISECONDS));
+      watch.stop();
+    }
+    return metadata_v3;
+  }
+
+  /**
+   * Get the parquet metadata for a list of parquet files
+   *
+   * @param fileStatusMap file statuses and corresponding file systems
+   * @return parquet table metadata object
+   * @throws IOException if parquet file metadata can't be obtained
+   */
+  private ParquetTableMetadata_v3 getParquetTableMetadata(Map<FileStatus, 
FileSystem> fileStatusMap)
+      throws IOException {
+    ParquetTableMetadata_v3 tableMetadata = new 
ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(),
+                                                                        
DrillVersionInfo.getVersion());
+    tableMetadata.files = getParquetFileMetadata_v3(tableMetadata, 
fileStatusMap);
+    tableMetadata.directories = new ArrayList<>();
+    return tableMetadata;
+  }
+
+  /**
+   * Get a list of file metadata for a list of parquet files
+   *
+   * @param parquetTableMetadata_v3 can store column schema info from all the 
files and row groups
+   * @param fileStatusMap parquet files statuses and corresponding file systems
+   *
+   * @return list of the parquet file metadata with absolute paths
+   * @throws IOException is thrown in case of issues while executing the list 
of runnables
+   */
+  private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(
+      ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, 
FileSystem> fileStatusMap) throws IOException {
+
+    List<TimedRunnable<ParquetFileMetadata_v3>> gatherers = 
fileStatusMap.entrySet().stream()
+        .map(e -> new MetadataGatherer(parquetTableMetadata_v3, e.getKey(), 
e.getValue()))
+        .collect(Collectors.toList());
+
+    List<ParquetFileMetadata_v3> metaDataList = new ArrayList<>();
+    metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, 
gatherers, 16));
+    return metaDataList;
+  }
+
+  /**
+   * TimedRunnable that reads the footer from parquet and collects file 
metadata
+   */
+  private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata_v3> 
{
+
+    private final ParquetTableMetadata_v3 parquetTableMetadata;
+    private final FileStatus fileStatus;
+    private final FileSystem fs;
+
+    MetadataGatherer(ParquetTableMetadata_v3 parquetTableMetadata, FileStatus 
fileStatus, FileSystem fs) {
+      this.parquetTableMetadata = parquetTableMetadata;
+      this.fileStatus = fileStatus;
+      this.fs = fs;
+    }
+
+    @Override
+    protected ParquetFileMetadata_v3 runInner() throws Exception {
+      return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs);
+    }
+
+    @Override
+    protected IOException convertToIOException(Exception e) {
+      if (e instanceof IOException) {
+        return (IOException) e;
+      } else {
+        return new IOException(e);
+      }
+    }
+  }
+
+  private ColTypeInfo getColTypeInfo(MessageType schema, Type type, String[] 
path, int depth) {
+    if (type.isPrimitive()) {
+      PrimitiveType primitiveType = (PrimitiveType) type;
+      int precision = 0;
+      int scale = 0;
+      if (primitiveType.getDecimalMetadata() != null) {
+        precision = primitiveType.getDecimalMetadata().getPrecision();
+        scale = primitiveType.getDecimalMetadata().getScale();
+      }
+
+      int repetitionLevel = schema.getMaxRepetitionLevel(path);
+      int definitionLevel = schema.getMaxDefinitionLevel(path);
+
+      return new ColTypeInfo(type.getOriginalType(), precision, scale, 
repetitionLevel, definitionLevel);
+    }
+    Type t = ((GroupType) type).getType(path[depth]);
+    return getColTypeInfo(schema, t, path, depth + 1);
+  }
+
+  private class ColTypeInfo {
+    public OriginalType originalType;
+    public int precision;
+    public int scale;
+    public int repetitionLevel;
+    public int definitionLevel;
+
+    ColTypeInfo(OriginalType originalType, int precision, int scale, int 
repetitionLevel, int definitionLevel) {
+      this.originalType = originalType;
+      this.precision = precision;
+      this.scale = scale;
+      this.repetitionLevel = repetitionLevel;
+      this.definitionLevel = definitionLevel;
+    }
+  }
+
+  /**
+   * Get the metadata for a single file
+   */
+  private ParquetFileMetadata_v3 
getParquetFileMetadata_v3(ParquetTableMetadata_v3 parquetTableMetadata,
+      final FileStatus file, final FileSystem fs) throws IOException, 
InterruptedException {
+    final ParquetMetadata metadata;
+    final UserGroupInformation processUserUgi = 
ImpersonationUtil.getProcessUserUGI();
+    try {
+      metadata = 
processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)
+          () -> ParquetFileReader.readFooter(fs.getConf(), file, 
ParquetMetadataConverter.NO_FILTER));
+    } catch(Exception e) {
+      logger.error("Exception while reading footer of parquet file [Details - 
path: {}, owner: {}] as process user {}",
+        file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
+      throw e;
+    }
+
+    MessageType schema = metadata.getFileMetaData().getSchema();
+
+//    Map<SchemaPath, OriginalType> originalTypeMap = Maps.newHashMap();
+    Map<SchemaPath, ColTypeInfo> colTypeInfoMap = Maps.newHashMap();
+    schema.getPaths();
+    for (String[] path : schema.getPaths()) {
+      colTypeInfoMap.put(SchemaPath.getCompoundPath(path), 
getColTypeInfo(schema, schema, path, 0));
+    }
+
+    List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList();
+
+    ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
+    ALL_COLS.add(SchemaPath.STAR_COLUMN);
+    boolean autoCorrectCorruptDates = 
formatConfig.areCorruptDatesAutoCorrected();
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = 
ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, 
autoCorrectCorruptDates);
+    if (logger.isDebugEnabled()) {
+      logger.debug(containsCorruptDates.toString());
+    }
+    for (BlockMetaData rowGroup : metadata.getBlocks()) {
+      List<ColumnMetadata_v3> columnMetadataList = Lists.newArrayList();
+      long length = 0;
+      for (ColumnChunkMetaData col : rowGroup.getColumns()) {
+        ColumnMetadata_v3 columnMetadata;
+
+        boolean statsAvailable = (col.getStatistics() != null && 
!col.getStatistics().isEmpty());
+
+        Statistics<?> stats = col.getStatistics();
+        String[] columnName = col.getPath().toArray();
+        SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName);
+        ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName);
+
+        ColumnTypeMetadata_v3 columnTypeMetadata =
+            new ColumnTypeMetadata_v3(columnName, col.getType(), 
colTypeInfo.originalType,
+                colTypeInfo.precision, colTypeInfo.scale, 
colTypeInfo.repetitionLevel, colTypeInfo.definitionLevel);
+
+        if (parquetTableMetadata.columnTypeInfo == null) {
+          parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>();
+        }
+        // Save the column schema info. We'll merge it into one list
+        parquetTableMetadata.columnTypeInfo
+            .put(new ColumnTypeMetadata_v3.Key(columnTypeMetadata.name), 
columnTypeMetadata);
+        if (statsAvailable) {
+          // Write stats when they are not null
+          Object minValue = null;
+          Object maxValue = null;
+          if (stats.genericGetMax() != null && stats.genericGetMin() != null ) 
{
+            minValue = stats.genericGetMin();
+            maxValue = stats.genericGetMax();
+            if (containsCorruptDates == 
ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION
+                && columnTypeMetadata.originalType == OriginalType.DATE) {
+              minValue = 
ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
+              maxValue = 
ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
+            }
+
+          }
+          columnMetadata =
+              new ColumnMetadata_v3(columnTypeMetadata.name, col.getType(), 
minValue, maxValue, stats.getNumNulls());
+        } else {
+          columnMetadata = new ColumnMetadata_v3(columnTypeMetadata.name, 
col.getType(), null, null, null);
+        }
+        columnMetadataList.add(columnMetadata);
+        length += col.getTotalSize();
+      }
+
+      // DRILL-5009: Skip the RowGroup if it is empty
+      // Note we still read the schema even if there are no values in the 
RowGroup
+      if (rowGroup.getRowCount() == 0) {
+        continue;
+      }
+      RowGroupMetadata_v3 rowGroupMeta =
+          new RowGroupMetadata_v3(rowGroup.getStartingPos(), length, 
rowGroup.getRowCount(),
+              getHostAffinity(file, fs, rowGroup.getStartingPos(), length), 
columnMetadataList);
+
+      rowGroupMetadataList.add(rowGroupMeta);
+    }
+    String path = 
Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString();
+
+    return new ParquetFileMetadata_v3(path, file.getLen(), 
rowGroupMetadataList);
+  }
+
+  /**
+   * Get the host affinity for a row group.
+   *
+   * @param fileStatus the parquet file
+   * @param start      the start of the row group
+   * @param length     the length of the row group
+   * @return host affinity for the row group
+   */
+  private Map<String, Float> getHostAffinity(FileStatus fileStatus, FileSystem 
fs, long start, long length)
+      throws IOException {
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 
start, length);
+    Map<String, Float> hostAffinityMap = Maps.newHashMap();
+    for (BlockLocation blockLocation : blockLocations) {
+      for (String host : blockLocation.getHosts()) {
+        Float currentAffinity = hostAffinityMap.get(host);
+        float blockStart = blockLocation.getOffset();
+        float blockEnd = blockStart + blockLocation.getLength();
+        float rowGroupEnd = start + length;
+        Float newAffinity = (blockLocation.getLength() - (blockStart < start ? 
start - blockStart : 0) -
+            (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length;
+        if (currentAffinity != null) {
+          hostAffinityMap.put(host, currentAffinity + newAffinity);
+        } else {
+          hostAffinityMap.put(host, newAffinity);
+        }
+      }
+    }
+    return hostAffinityMap;
+  }
+
+  /**
+   * Serialize parquet metadata to json and write to a file.
+   *
+   * @param parquetTableMetadata parquet table metadata
+   * @param p file path
+   */
+  private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p, 
FileSystem fs) throws IOException {
+    JsonFactory jsonFactory = new JsonFactory();
+    jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
+    jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+    ObjectMapper mapper = new ObjectMapper(jsonFactory);
+    SimpleModule module = new SimpleModule();
+    module.addSerializer(ColumnMetadata_v3.class, new 
ColumnMetadata_v3.Serializer());
+    mapper.registerModule(module);
+    FSDataOutputStream os = fs.create(p);
+    mapper.writerWithDefaultPrettyPrinter().writeValue(os, 
parquetTableMetadata);
+    os.flush();
+    os.close();
+  }
+
+  private void writeFile(ParquetTableMetadataDirs parquetTableMetadataDirs, 
Path p, FileSystem fs) throws IOException {
+    JsonFactory jsonFactory = new JsonFactory();
+    jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
+    jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+    ObjectMapper mapper = new ObjectMapper(jsonFactory);
+    SimpleModule module = new SimpleModule();
+    mapper.registerModule(module);
+    FSDataOutputStream os = fs.create(p);
+    mapper.writerWithDefaultPrettyPrinter().writeValue(os, 
parquetTableMetadataDirs);
+    os.flush();
+    os.close();
+  }
+
+  /**
+   * Read the parquet metadata from a file
+   *
+   * @param path to metadata file
+   * @param dirsOnly true for {@link Metadata#METADATA_DIRECTORIES_FILENAME}
+   *                 or false for {@link Metadata#METADATA_FILENAME} files 
reading
+   * @param metaContext current metadata context
+   */
+  private void readBlockMeta(Path path, boolean dirsOnly, MetadataContext 
metaContext, FileSystem fs) {
+    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
+    Path metadataParentDir = 
Path.getPathWithoutSchemeAndAuthority(path.getParent());
+    String metadataParentDirPath = metadataParentDir.toUri().getPath();
+    ObjectMapper mapper = new ObjectMapper();
+
+    final SimpleModule serialModule = new SimpleModule();
+    serialModule.addDeserializer(SchemaPath.class, new SchemaPath.De());
+    
serialModule.addKeyDeserializer(Metadata_V2.ColumnTypeMetadata_v2.Key.class, 
new Metadata_V2.ColumnTypeMetadata_v2.Key.DeSerializer());
+    serialModule.addKeyDeserializer(ColumnTypeMetadata_v3.Key.class, new 
ColumnTypeMetadata_v3.Key.DeSerializer());
+
+    AfterburnerModule module = new AfterburnerModule();
+    module.setUseOptimizedBeanDeserializer(true);
+
+    mapper.registerModule(serialModule);
+    mapper.registerModule(module);
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    try (FSDataInputStream is = fs.open(path)) {
+      boolean alreadyCheckedModification;
+      boolean newMetadata = false;
+      alreadyCheckedModification = 
metaContext.getStatus(metadataParentDirPath);
+
+      if (dirsOnly) {
+        parquetTableMetadataDirs = mapper.readValue(is, 
ParquetTableMetadataDirs.class);
+        if (timer != null) {
+          logger.debug("Took {} ms to read directories from directory cache 
file", timer.elapsed(TimeUnit.MILLISECONDS));
+          timer.stop();
+        }
+        parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
+        if (!alreadyCheckedModification && 
tableModified(parquetTableMetadataDirs.getDirectories(), path, 
metadataParentDir, metaContext, fs)) {
+          parquetTableMetadataDirs =
+              
(createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(),
 fs)).getRight();
+          newMetadata = true;
+        }
+      } else {
+        parquetTableMetadata = mapper.readValue(is, 
ParquetTableMetadataBase.class);
+        if (timer != null) {
+          logger.debug("Took {} ms to read metadata from cache file", 
timer.elapsed(TimeUnit.MILLISECONDS));
+          timer.stop();
+        }
+        if (new 
MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new 
MetadataVersion(3, 0)) >= 0) {
+          ((ParquetTableMetadata_v3) 
parquetTableMetadata).updateRelativePaths(metadataParentDirPath);
+        }
+        if (!alreadyCheckedModification && 
tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, 
metaContext, fs)) {
+          parquetTableMetadata =
+              
(createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(),
 fs)).getLeft();
+          newMetadata = true;
+        }
+
+        // DRILL-5009: Remove the RowGroup if it is empty
+        List<? extends ParquetFileMetadata> files = 
parquetTableMetadata.getFiles();
+        for (ParquetFileMetadata file : files) {
+          List<? extends RowGroupMetadata> rowGroups = file.getRowGroups();
+          for (Iterator<? extends RowGroupMetadata> iter = 
rowGroups.iterator(); iter.hasNext(); ) {
+            RowGroupMetadata r = iter.next();
+            if (r.getRowCount() == 0) {
+              iter.remove();
+            }
+          }
+        }
+
+      }
+      if (newMetadata) {
+        // if new metadata files were created, invalidate the existing 
metadata context
+        metaContext.clear();
+      }
+    } catch (IOException e) {
+      logger.error("Failed to read '{}' metadata file", path, e);
+      metaContext.setMetadataCacheCorrupted(true);
+    }
+  }
+
+  /**
+   * Check if the parquet metadata needs to be updated by comparing the 
modification time of the directories with
+   * the modification time of the metadata file
+   *
+   * @param directories List of directories
+   * @param metaFilePath path of parquet metadata cache file
+   * @return true if metadata needs to be updated, false otherwise
+   * @throws IOException if some resources are not accessible
+   */
+  private boolean tableModified(List<String> directories, Path metaFilePath, 
Path parentDir, MetadataContext metaContext, FileSystem fs) throws IOException {
+    Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
+    metaContext.setStatus(parentDir.toUri().getPath());
+    long metaFileModifyTime = 
fs.getFileStatus(metaFilePath).getModificationTime();
+    FileStatus directoryStatus = fs.getFileStatus(parentDir);
+    int numDirs = 1;
+    if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+      if (timer != null) {
+        logger.debug("Directory {} was modified. Took {} ms to check 
modification time of {} directories",
+            directoryStatus.getPath().toString(), 
timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
+        timer.stop();
+      }
+      return true;
+    }
+    for (String directory : directories) {
+      numDirs++;
+      metaContext.setStatus(directory);
+      directoryStatus = fs.getFileStatus(new Path(directory));
+      if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+        if (timer != null) {
+          logger.debug("Directory {} was modified. Took {} ms to check 
modification time of {} directories",
+              directoryStatus.getPath().toString(), 
timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
+          timer.stop();
+        }
+        return true;
+      }
+    }
+    if (timer != null) {
+      logger.debug("No directories were modified. Took {} ms to check 
modification time of {} directories",
+          timer.elapsed(TimeUnit.MILLISECONDS), numDirs);
+      timer.stop();
+    }
+    return false;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
new file mode 100644
index 0000000..d7d56c3
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.metadata;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V1;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V2;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_1;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_2;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V3_3;
+
+public class MetadataBase {
+
+  /**
+   * Basic class for parquet metadata. Inheritors of this class are json 
serializable structures which contain
+   * different metadata versions for an entire parquet directory structure
+   * <p>
+   * If any new code changes affect on the metadata files content, please 
update metadata version in such manner:
+   * Bump up metadata major version if metadata structure is changed.
+   * Bump up metadata minor version if only metadata content is changed, but 
metadata structure is the same.
+   * <p>
+   * Note: keep metadata versions synchronized with {@link 
MetadataVersion.Constants}
+   */
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+      include = JsonTypeInfo.As.PROPERTY,
+      property = "metadata_version",
+      visible = true)
+  @JsonSubTypes({
+      @JsonSubTypes.Type(value = Metadata_V1.ParquetTableMetadata_v1.class, 
name = V1),
+      @JsonSubTypes.Type(value = Metadata_V2.ParquetTableMetadata_v2.class, 
name = V2),
+      @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, 
name = V3),
+      @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, 
name = V3_1),
+      @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, 
name = V3_2),
+      @JsonSubTypes.Type(value = Metadata_V3.ParquetTableMetadata_v3.class, 
name = V3_3)
+  })
+  public static abstract class ParquetTableMetadataBase {
+
+    @JsonIgnore
+    public abstract List<String> getDirectories();
+
+    @JsonIgnore public abstract List<? extends ParquetFileMetadata> getFiles();
+
+    @JsonIgnore public abstract void assignFiles(List<? extends 
ParquetFileMetadata> newFiles);
+
+    public abstract boolean hasColumnMetadata();
+
+    @JsonIgnore public abstract PrimitiveType.PrimitiveTypeName 
getPrimitiveType(String[] columnName);
+
+    @JsonIgnore public abstract OriginalType getOriginalType(String[] 
columnName);
+
+    @JsonIgnore public abstract Integer getRepetitionLevel(String[] 
columnName);
+
+    @JsonIgnore public abstract Integer getDefinitionLevel(String[] 
columnName);
+
+    @JsonIgnore public abstract boolean isRowGroupPrunable();
+
+    @JsonIgnore public abstract ParquetTableMetadataBase clone();
+
+    @JsonIgnore public abstract String getDrillVersion();
+
+    @JsonIgnore public abstract String getMetadataVersion();
+  }
+
+  public static abstract class ParquetFileMetadata {
+    @JsonIgnore public abstract String getPath();
+
+    @JsonIgnore public abstract Long getLength();
+
+    @JsonIgnore public abstract List<? extends RowGroupMetadata> 
getRowGroups();
+  }
+
+
+  public static abstract class RowGroupMetadata {
+    @JsonIgnore public abstract Long getStart();
+
+    @JsonIgnore public abstract Long getLength();
+
+    @JsonIgnore public abstract Long getRowCount();
+
+    @JsonIgnore public abstract Map<String, Float> getHostAffinity();
+
+    @JsonIgnore public abstract List<? extends ColumnMetadata> getColumns();
+  }
+
+
+  public static abstract class ColumnMetadata {
+    public abstract String[] getName();
+
+    public abstract Long getNulls();
+
+    public abstract boolean hasSingleValue(long rowCount);
+
+    public abstract Object getMinValue();
+
+    public abstract Object getMaxValue();
+
+    /**
+     * Set the max value recorded in the parquet metadata statistics.
+     *
+     * This object would just be immutable, but due to Drill-4203 we need to 
correct
+     * date values that had been corrupted by earlier versions of Drill.
+     */
+    public abstract void setMax(Object newMax);
+
+    /**
+     * Set the min value recorded in the parquet metadata statistics.
+     *
+     * This object would just be immutable, but due to Drill-4203 we need to 
correct
+     * date values that had been corrupted by earlier versions of Drill.
+     */
+    public abstract void setMin(Object newMax);
+
+    public abstract PrimitiveType.PrimitiveTypeName getPrimitiveType();
+
+    public abstract OriginalType getOriginalType();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
new file mode 100644
index 0000000..b9480e8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.metadata;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFileMetadata_v3;
+import static 
org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
+
+/**
+ * Util class that contains helper methods for converting paths in the table 
and directory metadata structures
+ */
+public class MetadataPathUtils {
+
+  /**
+   * Helper method that converts a list of relative paths to absolute ones
+   *
+   * @param paths list of relative paths
+   * @param baseDir base parent directory
+   * @return list of absolute paths
+   */
+  public static List<String> convertToAbsolutePaths(List<String> paths, String 
baseDir) {
+    if (!paths.isEmpty()) {
+      List<String> absolutePaths = Lists.newArrayList();
+      for (String relativePath : paths) {
+        String absolutePath = (new Path(relativePath).isAbsolute()) ? 
relativePath
+            : new Path(baseDir, relativePath).toUri().getPath();
+        absolutePaths.add(absolutePath);
+      }
+      return absolutePaths;
+    }
+    return paths;
+  }
+
+  /**
+   * Convert a list of files with relative paths to files with absolute ones
+   *
+   * @param files list of files with relative paths
+   * @param baseDir base parent directory
+   * @return list of files with absolute paths
+   */
+  public static List<ParquetFileMetadata_v3> convertToFilesWithAbsolutePaths(
+      List<ParquetFileMetadata_v3> files, String baseDir) {
+    if (!files.isEmpty()) {
+      List<ParquetFileMetadata_v3> filesWithAbsolutePaths = 
Lists.newArrayList();
+      for (ParquetFileMetadata_v3 file : files) {
+        Path relativePath = new Path(file.getPath());
+        // create a new file if old one contains a relative path, otherwise 
use an old file
+        ParquetFileMetadata_v3 fileWithAbsolutePath = 
(relativePath.isAbsolute()) ? file
+            : new ParquetFileMetadata_v3(new Path(baseDir, 
relativePath).toUri().getPath(), file.length, file.rowGroups);
+        filesWithAbsolutePaths.add(fileWithAbsolutePath);
+      }
+      return filesWithAbsolutePaths;
+    }
+    return files;
+  }
+
+  /**
+   * Creates a new parquet table metadata from the {@code 
tableMetadataWithAbsolutePaths} parquet table.
+   * A new parquet table will contain relative paths for the files and 
directories.
+   *
+   * @param tableMetadataWithAbsolutePaths parquet table metadata with 
absolute paths for the files and directories
+   * @param baseDir base parent directory
+   * @return parquet table metadata with relative paths for the files and 
directories
+   */
+  public static ParquetTableMetadata_v3 createMetadataWithRelativePaths(
+      ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, String baseDir) {
+    List<String> directoriesWithRelativePaths = Lists.newArrayList();
+    for (String directory : tableMetadataWithAbsolutePaths.getDirectories()) {
+      directoriesWithRelativePaths.add(relativize(baseDir, directory)) ;
+    }
+    List<ParquetFileMetadata_v3> filesWithRelativePaths = Lists.newArrayList();
+    for (ParquetFileMetadata_v3 file : tableMetadataWithAbsolutePaths.files) {
+      filesWithRelativePaths.add(new ParquetFileMetadata_v3(
+          relativize(baseDir, file.getPath()), file.length, file.rowGroups));
+    }
+    return new ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(), 
tableMetadataWithAbsolutePaths,
+        filesWithRelativePaths, directoriesWithRelativePaths, 
DrillVersionInfo.getVersion());
+  }
+
+  /**
+   * Constructs relative path from child full path and base path. Or return 
child path if the last one is already relative
+   *
+   * @param childPath full absolute path
+   * @param baseDir base path (the part of the Path, which should be cut off 
from child path)
+   * @return relative path
+   */
+  public static String relativize(String baseDir, String childPath) {
+    Path fullPathWithoutSchemeAndAuthority = 
Path.getPathWithoutSchemeAndAuthority(new Path(childPath));
+    Path basePathWithoutSchemeAndAuthority = 
Path.getPathWithoutSchemeAndAuthority(new Path(baseDir));
+
+    // Since hadoop Path hasn't relativize() we use uri.relativize() to get 
relative path
+    Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri()
+        .relativize(fullPathWithoutSchemeAndAuthority.toUri()));
+    if (relativeFilePath.isAbsolute()) {
+      throw new IllegalStateException(String.format("Path %s is not a subpath 
of %s.",
+          basePathWithoutSchemeAndAuthority.toUri().getPath(), 
fullPathWithoutSchemeAndAuthority.toUri().getPath()));
+    }
+    return relativeFilePath.toUri().getPath();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
new file mode 100644
index 0000000..15b4b9d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataVersion.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.metadata;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableSortedSet;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MetadataVersion implements Comparable<MetadataVersion> {
+
+  private static final String FORMAT = "v?((?!0)\\d+)(\\.(\\d+))?";
+  private static final Pattern PATTERN = Pattern.compile(FORMAT);
+
+  private final int major;
+  private final int minor;
+
+  public MetadataVersion(int major, int minor) {
+    this.major = major;
+    this.minor = minor;
+  }
+
+  public MetadataVersion(String metadataVersion) {
+    Matcher matcher = PATTERN.matcher(metadataVersion);
+    if (!matcher.matches()) {
+      DrillRuntimeException.format("Could not parse metadata version '%s' 
using format '%s'", metadataVersion, FORMAT);
+    }
+    this.major = Integer.parseInt(matcher.group(1));
+    this.minor = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) 
: 0;
+  }
+
+  public int getMajor() {
+    return major;
+  }
+
+  public int getMinor() {
+    return minor;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof MetadataVersion)) {
+      return false;
+    }
+    MetadataVersion that = (MetadataVersion) o;
+    return this.major == that.major
+        && this.minor == that.minor;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = major;
+    result = 31 * result + minor;
+    return result;
+  }
+
+  /**
+   * @return string representation of the metadata file version, for example: 
"1", "10", "4.13"
+   * <p>
+   * String metadata version consists of the following characters:<p>
+   * major metadata version (any number of digits, except a single zero 
digit),<p>
+   * optional "." delimiter (used if minor metadata version is specified),<p>
+   * minor metadata version (not specified for "0" minor version)
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(major);
+    if (minor != 0) {
+      builder.append(".").append(minor);
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public int compareTo(MetadataVersion o) {
+    Preconditions.checkNotNull(o);
+    return ComparisonChain.start()
+        .compare(this.major, o.major)
+        .compare(this.minor, o.minor)
+        .result();
+  }
+
+/**
+ * Supported metadata versions.
+ * <p>
+ * Note: keep them synchronized with {@link Metadata.ParquetTableMetadataBase} 
versions
+ */
+  public static class Constants {
+    /**
+     * Version 1: Introduces parquet file metadata caching.<br>
+     * See DRILL-2743
+     */
+    public static final String V1 = "v1";
+    /**
+     * Version 2: Metadata cache file size is reduced.<br>
+     * See DRILL-4053
+     */
+    public static final String V2 = "v2";
+    /**
+     * Version 3: Difference between v3 and v2 : min/max, type_length, 
precision, scale, repetitionLevel, definitionLevel.<br>
+     * Filter pushdown for Parquet is implemented. <br>
+     * See DRILL-1950
+     */
+    public static final String V3 = "v3";
+    /**
+     * Version 3.1: Absolute paths of files and directories are replaced with 
relative ones. Metadata version value
+     * doesn't contain `v` letter<br>
+     * See DRILL-3867, DRILL-5660
+     */
+    public static final String V3_1 = "3.1";
+
+    /**
+     * Version 3.2: An array with the components of the field name in
+     * {@link Metadata.ColumnTypeMetadata_v3.Key} class is replaced by the 
SchemaPath.<br>
+     * See DRILL-4264
+     */
+    public static final String V3_2 = "3.2";
+
+    /**
+     * Version 3.3: Changed serialization of BINARY and FIXED_LEN_BYTE_ARRAY 
fields.<br>
+     * See DRILL-4139
+     */
+    public static final String V3_3 = "3.3";
+
+    /**
+     * All historical versions of the Drill metadata cache files. In case of 
introducing a new parquet metadata version
+     * please follow the {@link MetadataVersion#FORMAT}.
+     */
+    public static final SortedSet<MetadataVersion> SUPPORTED_VERSIONS = 
ImmutableSortedSet.of(
+        new MetadataVersion(V1),
+        new MetadataVersion(V2),
+        new MetadataVersion(V3),
+        new MetadataVersion(V3_1),
+        new MetadataVersion(V3_2),
+        new MetadataVersion(V3_3)
+    );
+
+    /**
+     * @param metadataVersion string representation of the parquet metadata 
version
+     * @return true if metadata version is supported, false otherwise
+     */
+    public static boolean isVersionSupported(String metadataVersion) {
+      return SUPPORTED_VERSIONS.contains(new MetadataVersion(metadataVersion));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
new file mode 100644
index 0000000..92feb5f
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.metadata;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.V1;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
+import static 
org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;
+
+
+public class Metadata_V1 {
+
+  @JsonTypeName(V1)
+  public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase 
{
+    @JsonProperty(value = "metadata_version", access = 
JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
+    @JsonProperty
+    List<ParquetFileMetadata_v1> files;
+    @JsonProperty List<String> directories;
+
+    public ParquetTableMetadata_v1() {
+    }
+
+    public ParquetTableMetadata_v1(String metadataVersion, 
List<ParquetFileMetadata_v1> files, List<String> directories) {
+      this.metadataVersion = metadataVersion;
+      this.files = files;
+      this.directories = directories;
+    }
+
+    @JsonIgnore
+    @Override public List<String> getDirectories() {
+      return directories;
+    }
+
+    @JsonIgnore @Override public List<? extends ParquetFileMetadata> 
getFiles() {
+      return files;
+    }
+
+    @JsonIgnore @Override public void assignFiles(List<? extends 
ParquetFileMetadata> newFiles) {
+      this.files = (List<ParquetFileMetadata_v1>) newFiles;
+    }
+
+    @Override public boolean hasColumnMetadata() {
+      return false;
+    }
+
+    @JsonIgnore @Override public PrimitiveType.PrimitiveTypeName 
getPrimitiveType(String[] columnName) {
+      return null;
+    }
+
+    @JsonIgnore @Override public OriginalType getOriginalType(String[] 
columnName) {
+      return null;
+    }
+
+    @JsonIgnore @Override
+    public Integer getRepetitionLevel(String[] columnName) {
+      return null;
+    }
+
+    @JsonIgnore @Override
+    public Integer getDefinitionLevel(String[] columnName) {
+      return null;
+    }
+
+    @JsonIgnore @Override
+    public boolean isRowGroupPrunable() {
+      return false;
+    }
+
+    @JsonIgnore @Override public MetadataBase.ParquetTableMetadataBase clone() 
{
+      return new ParquetTableMetadata_v1(metadataVersion, files, directories);
+    }
+
+    @JsonIgnore @Override
+    public String getDrillVersion() {
+      return null;
+    }
+
+    @JsonIgnore @Override public String getMetadataVersion() {
+      return metadataVersion;
+    }
+  }
+
+
+  /**
+   * Struct which contains the metadata for a single parquet file
+   */
+  public static class ParquetFileMetadata_v1 extends ParquetFileMetadata {
+    @JsonProperty
+    public String path;
+    @JsonProperty
+    public Long length;
+    @JsonProperty
+    public List<RowGroupMetadata_v1> rowGroups;
+
+    public ParquetFileMetadata_v1() {
+    }
+
+    public ParquetFileMetadata_v1(String path, Long length, 
List<RowGroupMetadata_v1> rowGroups) {
+      this.path = path;
+      this.length = length;
+      this.rowGroups = rowGroups;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("path: %s rowGroups: %s", path, rowGroups);
+    }
+
+    @JsonIgnore @Override public String getPath() {
+      return path;
+    }
+
+    @JsonIgnore @Override public Long getLength() {
+      return length;
+    }
+
+    @JsonIgnore @Override public List<? extends RowGroupMetadata> 
getRowGroups() {
+      return rowGroups;
+    }
+  }
+
+
+  /**
+   * A struct that contains the metadata for a parquet row group
+   */
+  public static class RowGroupMetadata_v1 extends RowGroupMetadata {
+    @JsonProperty
+    public Long start;
+    @JsonProperty
+    public Long length;
+    @JsonProperty
+    public Long rowCount;
+    @JsonProperty
+    public Map<String, Float> hostAffinity;
+    @JsonProperty
+    public List<ColumnMetadata_v1> columns;
+
+    public RowGroupMetadata_v1() {
+    }
+
+    public RowGroupMetadata_v1(Long start, Long length, Long rowCount, 
Map<String, Float> hostAffinity,
+                               List<ColumnMetadata_v1> columns) {
+      this.start = start;
+      this.length = length;
+      this.rowCount = rowCount;
+      this.hostAffinity = hostAffinity;
+      this.columns = columns;
+    }
+
+    @Override public Long getStart() {
+      return start;
+    }
+
+    @Override public Long getLength() {
+      return length;
+    }
+
+    @Override public Long getRowCount() {
+      return rowCount;
+    }
+
+    @Override public Map<String, Float> getHostAffinity() {
+      return hostAffinity;
+    }
+
+    @Override public List<? extends ColumnMetadata> getColumns() {
+      return columns;
+    }
+  }
+
+
+  /**
+   * A struct that contains the metadata for a column in a parquet file
+   */
+  public static class ColumnMetadata_v1 extends ColumnMetadata {
+    @JsonProperty
+    public SchemaPath name;
+    @JsonProperty
+    public PrimitiveType.PrimitiveTypeName primitiveType;
+    @JsonProperty
+    public OriginalType originalType;
+    @JsonProperty
+    public Long nulls;
+
+    // JsonProperty for these are associated with the getters and setters
+    public Object max;
+    public Object min;
+
+
+    public ColumnMetadata_v1() {
+    }
+
+    public ColumnMetadata_v1(SchemaPath name, PrimitiveType.PrimitiveTypeName 
primitiveType, OriginalType originalType,
+                             Object max, Object min, Long nulls) {
+      this.name = name;
+      this.primitiveType = primitiveType;
+      this.originalType = originalType;
+      this.max = max;
+      this.min = min;
+      this.nulls = nulls;
+    }
+
+    @JsonProperty(value = "min")
+    public Object getMin() {
+      if (primitiveType == PrimitiveType.PrimitiveTypeName.BINARY && min != 
null) {
+        return new String(((Binary) min).getBytes());
+      }
+      return min;
+    }
+
+    @JsonProperty(value = "max")
+    public Object getMax() {
+      if (primitiveType == PrimitiveType.PrimitiveTypeName.BINARY && max != 
null) {
+        return new String(((Binary) max).getBytes());
+      }
+      return max;
+    }
+
+    @Override public PrimitiveType.PrimitiveTypeName getPrimitiveType() {
+      return primitiveType;
+    }
+
+    @Override public OriginalType getOriginalType() {
+      return originalType;
+    }
+
+    /**
+     * setter used during deserialization of the 'min' field of the metadata 
cache file.
+     *
+     * @param min
+     */
+    @JsonProperty(value = "min")
+    public void setMin(Object min) {
+      this.min = min;
+    }
+
+    /**
+     * setter used during deserialization of the 'max' field of the metadata 
cache file.
+     *
+     * @param max
+     */
+    @JsonProperty(value = "max")
+    public void setMax(Object max) {
+      this.max = max;
+    }
+
+    @Override public String[] getName() {
+      String[] s = new String[1];
+      String nameString = name.toString();
+      // Strip out the surrounding backticks.
+      s[0]=nameString.substring(1, nameString.length()-1);
+      return s;
+    }
+
+    @Override public Long getNulls() {
+      return nulls;
+    }
+
+    /**
+     * Checks that the column chunk has a single value.
+     * Returns {@code true} if {@code min} and {@code max} are the same but 
not null
+     * and nulls count is 0 or equal to the rows count.
+     * <p>
+     * Returns {@code true} if {@code min} and {@code max} are null and the 
number of null values
+     * in the column chunk is equal to the rows count.
+     * <p>
+     * Comparison of nulls and rows count is needed for the cases:
+     * <ul>
+     * <li>column with primitive type has single value and null values</li>
+     *
+     * <li>column <b>with primitive type</b> has only null values, min/max 
couldn't be null,
+     * but column has single value</li>
+     * </ul>
+     *
+     * @param rowCount rows count in column chunk
+     * @return true if column has single value
+     */
+    @Override
+    public boolean hasSingleValue(long rowCount) {
+      if (nulls != null) {
+        if (min != null) {
+          // Objects.deepEquals() is used here, since min and max may be byte 
arrays
+          return Objects.deepEquals(min, max) && (nulls == 0 || nulls == 
rowCount);
+        } else {
+          return nulls == rowCount && max == null;
+        }
+      }
+      return false;
+    }
+
+    @Override public Object getMinValue() {
+      return min;
+    }
+
+    @Override public Object getMaxValue() {
+      return max;
+    }
+
+  }
+
+}

Reply via email to