[ 
https://issues.apache.org/jira/browse/DRILL-7233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010933#comment-17010933
 ] 

ASF GitHub Bot commented on DRILL-7233:
---------------------------------------

cgivre commented on pull request #1778: DRILL-7233: Format Plugin for HDF5
URL: https://github.com/apache/drill/pull/1778#discussion_r364389461
 
 

 ##########
 File path: 
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
 ##########
 @@ -0,0 +1,1122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation;
+import ch.systemsx.cisd.hdf5.HDF5DataSetInformation;
+import ch.systemsx.cisd.hdf5.HDF5FactoryProvider;
+import ch.systemsx.cisd.hdf5.HDF5LinkInformation;
+import ch.systemsx.cisd.hdf5.IHDF5Factory;
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5EnumDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private static final Logger logger = 
LoggerFactory.getLogger(HDF5BatchReader.class);
+
+  private static final String PATH_COLUMN_NAME = "path";
+
+  private static final String DATA_TYPE_COLUMN_NAME = "data_type";
+
+  private static final String FILE_NAME_COLUMN_NAME = "file_name";
+
+  private static final String INT_COLUMN_PREFIX = "int_col_";
+
+  private static final String LONG_COLUMN_PREFIX = "long_col_";
+
+  private static final String FLOAT_COLUMN_PREFIX = "float_col_";
+
+  private static final String DOUBLE_COLUMN_PREFIX = "double_col_";
+
+  private static final String INT_COLUMN_NAME = "int_data";
+
+  private static final String FLOAT_COLUMN_NAME = "float_data";
+
+  private static final String DOUBLE_COLUMN_NAME = "double_data";
+
+  private static final String LONG_COLUMN_NAME = "long_data";
+
+  private final HDF5ReaderConfig readerConfig;
+
+  private final List<HDF5DataWriter> dataWriters;
+
+  private FileSplit split;
+
+  private IHDF5Reader hdf5Reader;
+
+  private File inFile;
+
+  private BufferedReader reader;
+
+  private RowSetLoader rowWriter;
+
+  private Iterator<HDF5DrillMetadata> metadataIterator;
+
+  private ScalarWriter pathWriter;
+
+  private ScalarWriter dataTypeWriter;
+
+  private ScalarWriter fileNameWriter;
+
+  private long[] dimensions;
+
+  public static class HDF5ReaderConfig {
+    final HDF5FormatPlugin plugin;
+
+    final String defaultPath;
+
+    final HDF5FormatConfig formatConfig;
+
+    final File tempDirectory;
+
+    public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig 
formatConfig) {
+      this.plugin = plugin;
+      this.formatConfig = formatConfig;
+      defaultPath = formatConfig.getDefaultPath();
+      this.tempDirectory = plugin.getTmpDir();
+    }
+  }
+
+  public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    dataWriters = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    try {
+      openFile(negotiator);
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to close input file: %s", split.getPath())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    ResultSetLoader loader;
+    if (readerConfig.defaultPath == null) {
+      // Get file metadata
+      List<HDF5DrillMetadata> metadata = 
getFileMetadata(hdf5Reader.object().getGroupMemberInformation("/", true), new 
ArrayList<>());
+      metadataIterator = metadata.iterator();
+
+      // Schema for Metadata query
+      SchemaBuilder builder = new SchemaBuilder()
+        .addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
+        .addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR)
+        .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR);
+
+      negotiator.setTableSchema(builder.buildSchema(), false);
+
+      loader = negotiator.build();
+      dimensions = new long[0];
+      rowWriter = loader.writer();
+
+    } else {
+      // This is the case when the default path is specified. Since the user 
is explicitly asking for a dataset
+      // Drill can obtain the schema by getting the datatypes below and 
ultimately mapping that schema to columns
+      HDF5DataSetInformation dsInfo = 
hdf5Reader.object().getDataSetInformation(readerConfig.defaultPath);
+      dimensions = dsInfo.getDimensions();
+
+      loader = negotiator.build();
+      rowWriter = loader.writer();
+      if (dimensions.length <= 1) {
+        buildSchemaFor1DimensionalDataset(dsInfo);
+      } else if (dimensions.length == 2) {
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      } else {
+        // Case for datasets of greater than 2D
+        // These are automatically flattened
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      }
+    }
+    if (readerConfig.defaultPath == null) {
+      pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
+      dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
+      fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+    }
+    return true;
+  }
+
+  /**
+   * This function is called when the default path is set and the data set is 
a single dimension.
+   * This function will create an array of one dataWriter of the
+   * correct datatype
+   * @param dsInfo The HDF5 dataset information
+   */
+  private void buildSchemaFor1DimensionalDataset(HDF5DataSetInformation 
dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add {}", 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+
+    switch (currentDataType) {
+      case GENERIC_OBJECT:
+        dataWriters.add(new HDF5EnumDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case VARCHAR:
+        dataWriters.add(new HDF5StringDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case TIMESTAMP:
+        dataWriters.add(new HDF5TimestampDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case INT:
+        dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case BIGINT:
+        dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case FLOAT8:
+        dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case FLOAT4:
+        dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case MAP:
+        dataWriters.add(new HDF5MapDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+    }
+  }
+
+  /**
+   * This function builds a Drill schema from a dataset with 2 or more 
dimensions.  HDF5 only supports INT, LONG, DOUBLE and FLOAT for >2 data types 
so this function is
+   * not as inclusinve as the 1D function.  This function will build the 
schema by adding DataWriters to the dataWriters array.
+   * @param dsInfo The dataset which Drill will use to build a schema
+   */
+
+  private void buildSchemaFor2DimensionalDataset(HDF5DataSetInformation 
dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add {}", 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+    long cols = dimensions[1];
+
+    String tempFieldName;
+    for (int i = 0; i < cols; i++) {
+      switch (currentDataType) {
+        case INT:
+          tempFieldName = INT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case BIGINT:
+          tempFieldName = LONG_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT8:
+          tempFieldName = DOUBLE_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT4:
+          tempFieldName = FLOAT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+      }
+    }
+  }
+  /**
+   * This function contains the logic to open an HDF5 file.
+   * @param negotiator The negotiator represents Drill's interface with the 
file system
+   */
+  private void openFile(FileSchemaNegotiator negotiator) throws IOException {
+    InputStream in = null;
+    try {
+      in = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      IHDF5Factory factory = HDF5FactoryProvider.get();
+      inFile = convertInputStreamToFile(in);
+      hdf5Reader = factory.openForReading(inFile);
+    } catch (Exception e) {
+      if (in != null) {
+        in.close();
+      }
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open input file: %s", split.getPath())
+        .build(logger);
+    }
+    reader = new BufferedReader(new InputStreamReader(in));
+  }
+
+  /**
+   * This function converts the Drill InputStream into a File object for the 
HDF5 library. This function
+   * exists due to a known limitation in the HDF5 library which cannot parse 
HDF5 directly from an input stream. A future
+   * release of the library will support this.
+   *
+   * @param stream The input stream to be converted to a File
+   * @return File The file which was converted from an InputStream
+   * @throws IOException Throws IOException if the InputStream cannot be 
opened or read.
+   */
+  private File convertInputStreamToFile(InputStream stream) throws IOException 
{
+    File tmpDir = readerConfig.tempDirectory;
+    String tempFileName = tmpDir.getPath() + "/~" + split.getPath().getName();
+    File targetFile = new File(tempFileName);
+
+    try {
+      java.nio.file.Files.copy(stream, targetFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+    } catch (Exception e) {
+      if (targetFile.exists()) {
+        targetFile.delete();
+      }
+      throw UserException
+        .dataWriteError(e)
+        .message("Failed to create temp HDF5 file: %s", split.getPath())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    IOUtils.closeQuietly(stream);
+    return targetFile;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (readerConfig.defaultPath == null || 
readerConfig.defaultPath.isEmpty()) {
+        if (!metadataIterator.hasNext()){
+          return false;
+        }
+        projectMetadataRow(rowWriter);
+      } else if (dimensions.length <= 1 && dataWriters.get(0).isCompound()) {
+        if (!dataWriters.get(0).hasNext()) {
+          return false;
+        }
+        dataWriters.get(0).write();
+      } else if (dimensions.length <= 1) {
+        // Case for Compound Data Type
+        if (!dataWriters.get(0).hasNext()) {
+          return false;
+        }
+        rowWriter.start();
+        dataWriters.get(0).write();
+        rowWriter.save();
+      } else {
+        int currentRowCount = 0;
+        HDF5DataWriter currentDataWriter;
+        rowWriter.start();
+
+        for (int i = 0; i < dimensions[1]; i++) {
+          currentDataWriter = dataWriters.get(i);
+          currentDataWriter.write();
+          currentRowCount = currentDataWriter.currentRowCount();
+        }
+        rowWriter.save();
+        if (currentRowCount >= dimensions[0]) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This function writes one row of HDF5 metadata.
+   * @param rowWriter The input rowWriter object
+   */
+  private void projectMetadataRow(RowSetLoader rowWriter) {
+    String realFileName = inFile.getName().replace("~", "");
+    HDF5DrillMetadata metadataRow = metadataIterator.next();
+    rowWriter.start();
+
+    pathWriter.setString(metadataRow.getPath());
+    dataTypeWriter.setString(metadataRow.getDataType());
+    fileNameWriter.setString(realFileName);
+
+    //Write attributes if present
+    if (metadataRow.getAttributes().size() > 0) {
+      writeAttributes(rowWriter, metadataRow);
+    }
+
+    if (metadataRow.getDataType().equalsIgnoreCase("DATASET")) {
+      projectDataset(rowWriter, metadataRow.getPath());
+    }
+    rowWriter.save();
+  }
+
+  /**
+   * This function gets the file metadata from a given HDF5 file.  It will 
extract the file name the path, and adds any information to the
+   * metadata List.
+   * @param members A list of paths from which the metadata will be extracted
+   * @param metadata the HDF5 metadata object from which the metadata will be 
extracted
+   * @return A list of metadata from the given file paths
+   */
+  private List<HDF5DrillMetadata> getFileMetadata(List<HDF5LinkInformation> 
members, List<HDF5DrillMetadata> metadata) {
+    for (HDF5LinkInformation info : members) {
+      HDF5DrillMetadata metadataRow = new HDF5DrillMetadata();
+
+      metadataRow.setPath(info.getPath());
+      metadataRow.setDataType(info.getType().toString());
+
+      switch (info.getType()) {
+        case DATASET:
+          metadataRow.setAttributes(getAttributes(info.getPath()));
+          metadata.add(metadataRow);
+          break;
+        case SOFT_LINK:
+          // Soft links cannot have attributes
+          metadata.add(metadataRow);
+          break;
+        case GROUP:
+          metadataRow.setAttributes(getAttributes(info.getPath()));
+          metadata.add(metadataRow);
+          metadata = 
getFileMetadata(hdf5Reader.object().getGroupMemberInformation(info.getPath(), 
true), metadata);
+          break;
+        default:
+          logger.warn("Unknown data type: {}", info.getType());
+      }
+    }
+    return metadata;
+  }
+
+  /**
+   * Gets the attributes of a HDF5 dataset and returns them into a HashMap
+   *
+   * @param path The path for which you wish to retrieve attributes
+   * @return Map The attributes for the given path.  Empty Map if no 
attributes present
+   */
+  private Map<String, HDF5Attribute> getAttributes(String path) {
+    Map<String, HDF5Attribute> attributes = new HashMap<>();
+
+    long attrCount = 
hdf5Reader.object().getObjectInformation(path).getNumberOfAttributes();
+    if (attrCount > 0) {
+      List<String> attrNames = hdf5Reader.object().getAllAttributeNames(path);
+      for (String name : attrNames) {
+        try {
+          HDF5Attribute attribute = HDF5Utils.getAttribute(path, name, 
hdf5Reader);
+          attributes.put(attribute.getKey(), attribute);
+        } catch (Exception e) {
+          logger.warn("Couldn't add attribute: {} - {}", path, name);
+        }
+      }
+    }
+    return attributes;
+  }
+
+  /**
+   * This function writes one row of data in a metadata query.  The number of 
dimensions here is n+1.  So if the actual dataset is a 1D column, it will be 
written as a list.
+   * This is function is only called in metadata queries as the schema is not 
known in advance.
+   *
+   * @param rowWriter The rowWriter to which the data will be written
+   * @param datapath The datapath from which the data will be read
+   * @return True if the dataset was successfully projected.  False if there 
is no more data
+   */
+  private boolean projectDataset(RowSetLoader rowWriter, String datapath) {
+    String fieldName = HDF5Utils.getNameFromPath(datapath);
+    IHDF5Reader reader = hdf5Reader;
+    HDF5DataSetInformation dsInfo = 
reader.object().getDataSetInformation(datapath);
+    long[] dimensions = dsInfo.getDimensions();
+    //Case for single dimensional data
+    if (dimensions.length <= 1) {
+      TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+      // Case for null or unknown data types:
+      if (currentDataType == null) {
+        logger.warn("Couldn't add {}", 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      }
+
+      assert currentDataType != null;
+      switch (currentDataType) {
+        case GENERIC_OBJECT:
+          logger.warn("Couldn't read {}", datapath );
+          break;
+        case VARCHAR:
+          String[] data = hdf5Reader.readStringArray(datapath);
+          writeStringListColumn(rowWriter, fieldName, data);
+          break;
+        case TIMESTAMP:
+          long[] longList = hdf5Reader.time().readTimeStampArray(datapath);
+          writeTimestampListColumn(rowWriter, fieldName, longList);
+          break;
+        case INT:
+          if (!dsInfo.getTypeInformation().isSigned()) {
+            if (dsInfo.getTypeInformation().getElementSize() > 4) {
+                longList = hdf5Reader.uint64().readArray(datapath);
+                writeLongListColumn(rowWriter, fieldName, longList);
+            }
+          } else {
+            int[] intList = hdf5Reader.readIntArray(datapath);
+            writeIntListColumn(rowWriter, fieldName, intList);
+          }
+          break;
+        case FLOAT4:
+          float[] tempFloatList = hdf5Reader.readFloatArray(datapath);
+          writeFloat4ListColumn(rowWriter, fieldName, tempFloatList);
+          break;
+        case FLOAT8:
+          double[] tempDoubleList = hdf5Reader.readDoubleArray(datapath);
+          writeFloat8ListColumn(rowWriter, fieldName, tempDoubleList);
+          break;
+        case BIGINT:
+          if (!dsInfo.getTypeInformation().isSigned()) {
+            logger.warn("Drill does not support unsigned 64bit integers.");
+            break;
+          }
+          long[] tempBigIntList = hdf5Reader.readLongArray(datapath);
+          writeLongListColumn(rowWriter, fieldName, tempBigIntList);
+          break;
+        case MAP:
+          try {
+            getAndMapCompoundData(datapath, new ArrayList<>(), hdf5Reader, 
rowWriter);
+          } catch (Exception e) {
+            throw UserException
+              .dataReadError()
+              .addContext("Error writing Compound Field: ")
+              .addContext(e.getMessage())
+              .build(logger);
+          }
+          break;
+
+        default:
+          // Case for data types that cannot be read
+          logger.warn("{} not implemented yet.", 
dsInfo.getTypeInformation().tryGetJavaType());
+      }
+    } else if (dimensions.length == 2) {
+      // Case for 2D data sets.  These are projected as lists of lists or maps 
of maps
+      long cols = dimensions[1];
+      long rows = dimensions[0];
+      switch (HDF5Utils.getDataType(dsInfo)) {
+        case INT:
+          int[][] colData = hdf5Reader.readIntMatrix(datapath);
+          mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT4:
+          float[][] floatData = hdf5Reader.readFloatMatrix(datapath);
+          mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT8:
+          double[][] doubleData = hdf5Reader.readDoubleMatrix(datapath);
+          mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
+          break;
+        case BIGINT:
+          long[][] longData = hdf5Reader.readLongMatrix(datapath);
+          mapBigintMatrixField(longData, (int) cols, (int) rows, rowWriter);
+          break;
+        default:
+          logger.warn("{} not implemented.", HDF5Utils.getDataType(dsInfo));
+      }
+    } else {
+      // Case for data sets with dimensions > 2
+      long cols = dimensions[1];
+      long rows = dimensions[0];
+      switch (HDF5Utils.getDataType(dsInfo)) {
+        case INT:
+          int[][] colData = 
hdf5Reader.int32().readMDArray(datapath).toMatrix();
+          mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT4:
+          float[][] floatData = 
hdf5Reader.float32().readMDArray(datapath).toMatrix();
+          mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT8:
+          double[][] doubleData = 
hdf5Reader.float64().readMDArray(datapath).toMatrix();
+          mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
+          break;
+        case BIGINT:
+          long[][] longData = 
hdf5Reader.int64().readMDArray(datapath).toMatrix();
+          mapBigintMatrixField(longData, (int) cols, (int) rows, rowWriter);
+          break;
+        default:
+          logger.warn("{} not implemented.", HDF5Utils.getDataType(dsInfo));
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper function to write a 1D boolean column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeBooleanColumn(TupleWriter rowWriter, String name, int 
value) {
+    writeBooleanColumn(rowWriter, name, value != 0);
+  }
+
+  /**
+   * Helper function to write a 1D boolean column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeBooleanColumn(TupleWriter rowWriter, String name, boolean 
value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.BIT, TypeProtos.DataMode.OPTIONAL);
+    colWriter.setBoolean(value);
+  }
+
+  /**
+   * Helper function to write a 1D int column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeIntColumn(TupleWriter rowWriter, String name, int value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL);
+    colWriter.setInt(value);
+  }
+
+
+  /**
+   * Helper function to write a 2D int list
+   * @param rowWriter the row to which the data will be written
+   * @param name the name of the outer list
+   * @param list the list of data
+   */
+  private void writeIntListColumn(TupleWriter rowWriter, String name, int[] 
list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (int value : list) {
+      arrayWriter.setInt(value);
+    }
+  }
+
+
+  private void mapIntMatrixField(int[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = INT_COLUMN_PREFIX + k;
+          writeIntColumn(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      intMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void intMatrixHelper(int[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The 
result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(INT_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.INT)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(INT_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(INT_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter intWriter = innerWriter.scalar();
+
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        intWriter.setInt(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void writeLongColumn(TupleWriter rowWriter, String name, long value) 
{
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.OPTIONAL);
+    colWriter.setLong(value);
+  }
+
+  private void writeLongListColumn(TupleWriter rowWriter, String name, long[] 
list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.BIGINT, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (long l : list) {
+      arrayWriter.setLong(l);
+    }
+  }
+
+  private void writeStringColumn(TupleWriter rowWriter, String name, String 
value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+    colWriter.setString(value);
+  }
+
+  private void writeStringListColumn(TupleWriter rowWriter, String name, 
String[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (String s : list) {
+      arrayWriter.setString(s);
+    }
+  }
+
+  private void writeFloat8Column(TupleWriter rowWriter, String name, double 
value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.OPTIONAL);
+    colWriter.setDouble(value);
+  }
+
+  private void writeFloat8ListColumn(TupleWriter rowWriter, String name, 
double[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (double v : list) {
+      arrayWriter.setDouble(v);
+    }
+  }
+
+  private void writeFloat4Column(TupleWriter rowWriter, String name, float 
value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.OPTIONAL);
+    colWriter.setDouble(value);
+  }
+
+  private void writeFloat4ListColumn(TupleWriter rowWriter, String name, 
float[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.FLOAT4, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (float v : list) {
+      arrayWriter.setDouble(v);
+    }
+  }
+
+  private void mapFloatMatrixField(float[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = FLOAT_COLUMN_PREFIX + k;
+          writeFloat4Column(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      floatMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void floatMatrixHelper(float[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The 
result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(FLOAT_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.FLOAT4)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(FLOAT_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(FLOAT_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter floatWriter = innerWriter.scalar();
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        floatWriter.setDouble(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void mapDoubleMatrixField(double[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = DOUBLE_COLUMN_PREFIX + k;
+          writeFloat8Column(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      doubleMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void doubleMatrixHelper(double[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The 
result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(DOUBLE_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.FLOAT8)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(DOUBLE_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(DOUBLE_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter floatWriter = innerWriter.scalar();
+
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        floatWriter.setDouble(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void mapBigintMatrixField(long[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = LONG_COLUMN_PREFIX + k;
+          writeLongColumn(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      bigintMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void bigintMatrixHelper(long[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The 
result should be a list of lists
+
+    TupleMetadata nestedSchema = new SchemaBuilder()
+      .addRepeatedList(LONG_COLUMN_NAME)
+      .addArray(TypeProtos.MinorType.BIGINT)
+      .resumeSchema()
+      .buildSchema();
+
+    int index = rowWriter.tupleSchema().index(LONG_COLUMN_NAME);
+    if (index == -1) {
+      index = rowWriter.addColumn(nestedSchema.column(LONG_COLUMN_NAME));
+    }
+
+    // The outer array
+    ArrayWriter listWriter = rowWriter.column(index).array();
+    // The inner array
+    ArrayWriter innerWriter = listWriter.array();
+    // The strings within the inner array
+    ScalarWriter bigintWriter = innerWriter.scalar();
+
+    for (int i = 0; i < rows; i++) {
+      for (int k = 0; k < cols; k++) {
+        bigintWriter.setLong(colData[i][k]);
+      }
+      listWriter.save();
+    }
+  }
+
+  private void writeTimestampColumn(TupleWriter rowWriter, String name, long 
timestamp) {
+    Instant ts = new Instant(timestamp);
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.OPTIONAL);
+    colWriter.setTimestamp(ts);
+  }
+
+  private void writeTimestampListColumn(TupleWriter rowWriter, String name, 
long[] list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.TIMESTAMP, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (long l : list) {
+      arrayWriter.setTimestamp(new Instant(l));
+    }
+  }
+
+  private ScalarWriter getColWriter(TupleWriter tupleWriter, String fieldName, 
TypeProtos.MinorType type, TypeProtos.DataMode mode) {
+    int index = tupleWriter.tupleSchema().index(fieldName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, type, 
mode);
+      index = tupleWriter.addColumn(colSchema);
+    }
+    return tupleWriter.scalar(index);
+  }
+
+  /**
+   * This helper function gets the attributes for an HDF5 datapath.  These 
attributes are projected as a map in select * queries when the defaultPath is 
null.
+   * @param rowWriter the row to which the data will be written
+   * @param record the record for the attributes
+   */
+  private void writeAttributes(TupleWriter rowWriter, HDF5DrillMetadata 
record) {
+    Map attribs = getAttributes(record.getPath());
+    Iterator entries = attribs.entrySet().iterator();
+
+    int index = rowWriter.tupleSchema().index("attributes");
+    if (index == -1) {
+      index = rowWriter
+        .addColumn(SchemaBuilder.columnSchema("attributes", 
TypeProtos.MinorType.MAP, TypeProtos.DataMode.REQUIRED));
+    }
+    TupleWriter mapWriter = rowWriter.tuple(index);
+
+    while (entries.hasNext()) {
+      Map.Entry entry = (Map.Entry) entries.next();
+      String key = (String) entry.getKey();
 
 Review comment:
   Fixed.  Now using generics.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Format Plugin for HDF5
> ----------------------
>
>                 Key: DRILL-7233
>                 URL: https://issues.apache.org/jira/browse/DRILL-7233
>             Project: Apache Drill
>          Issue Type: New Feature
>    Affects Versions: 1.17.0
>            Reporter: Charles Givre
>            Assignee: Charles Givre
>            Priority: Major
>              Labels: doc-impacting
>             Fix For: 1.18.0
>
>
> h2. Drill HDF5 Format Plugin
> h2. 
> Per wikipedia, Hierarchical Data Format (HDF) is a set of file formats 
> designed to store and organize large amounts of data. Originally developed at 
> the National Center for Supercomputing Applications, it is supported by The 
> HDF Group, a non-profit corporation whose mission is to ensure continued 
> development of HDF5 technologies and the continued accessibility of data 
> stored in HDF.
> This plugin enables Apache Drill to query HDF5 files.
> h3. Configuration
> There are three configuration variables in this plugin:
> type: This should be set to hdf5.
> extensions: This is a list of the file extensions used to identify HDF5 
> files. Typically HDF5 uses .h5 or .hdf5 as file extensions. This defaults to 
> .h5.
> defaultPath:
> h3. Example Configuration
> h3. 
> For most uses, the configuration below will suffice to enable Drill to query 
> HDF5 files.
> {{"hdf5": {
>       "type": "hdf5",
>       "extensions": [
>         "h5"
>       ],
>       "defaultPath": null
>     }}}
> h3. Usage
> Since HDF5 can be viewed as a file system within a file, a single file can 
> contain many datasets. For instance, if you have a simple HDF5 file, a star 
> query will produce the following result:
> {{apache drill> select * from dfs.test.`dset.h5`;
> +-------+-----------+-----------+--------------------------------------------------------------------------+
> | path  | data_type | file_name |                                 int_data    
>                              |
> +-------+-----------+-----------+--------------------------------------------------------------------------+
> | /dset | DATASET   | dset.h5   | 
> [[1,2,3,4,5,6],[7,8,9,10,11,12],[13,14,15,16,17,18],[19,20,21,22,23,24]] |
> +-------+-----------+-----------+--------------------------------------------------------------------------+}}
> The actual data in this file is mapped to a column called int_data. In order 
> to effectively access the data, you should use Drill's FLATTEN() function on 
> the int_data column, which produces the following result.
> {{apache drill> select flatten(int_data) as int_data from dfs.test.`dset.h5`;
> +---------------------+
> |      int_data       |
> +---------------------+
> | [1,2,3,4,5,6]       |
> | [7,8,9,10,11,12]    |
> | [13,14,15,16,17,18] |
> | [19,20,21,22,23,24] |
> +---------------------+}}
> Once you have the data in this form, you can access it similarly to how you 
> might access nested data in JSON or other files.
> {{apache drill> SELECT int_data[0] as col_0,
> . .semicolon> int_data[1] as col_1,
> . .semicolon> int_data[2] as col_2
> . .semicolon> FROM ( SELECT flatten(int_data) AS int_data
> . . . . . .)> FROM dfs.test.`dset.h5`
> . . . . . .)> );
> +-------+-------+-------+
> | col_0 | col_1 | col_2 |
> +-------+-------+-------+
> | 1     | 2     | 3     |
> | 7     | 8     | 9     |
> | 13    | 14    | 15    |
> | 19    | 20    | 21    |
> +-------+-------+-------+}}
> Alternatively, a better way to query the actual data in an HDF5 file is to 
> use the defaultPath field in your query. If the defaultPath field is defined 
> in the query, or via the plugin configuration, Drill will only return the 
> data, rather than the file metadata.
> ** Note: Once you have determined which data set you are querying, it is 
> advisable to use this method to query HDF5 data. **
> You can set the defaultPath variable in either the plugin configuration, or 
> at query time using the table() function as shown in the example below:
> {{SELECT * 
> FROM table(dfs.test.`dset.h5` (type => 'hdf5', defaultPath => '/dset'))}}
> This query will return the result below:
> {{apache drill> SELECT * FROM table(dfs.test.`dset.h5` (type => 'hdf5', 
> defaultPath => '/dset'));
> +-----------+-----------+-----------+-----------+-----------+-----------+
> | int_col_0 | int_col_1 | int_col_2 | int_col_3 | int_col_4 | int_col_5 |
> +-----------+-----------+-----------+-----------+-----------+-----------+
> | 1         | 2         | 3         | 4         | 5         | 6         |
> | 7         | 8         | 9         | 10        | 11        | 12        |
> | 13        | 14        | 15        | 16        | 17        | 18        |
> | 19        | 20        | 21        | 22        | 23        | 24        |
> +-----------+-----------+-----------+-----------+-----------+-----------+
> 4 rows selected (0.223 seconds)}}
> If the data in defaultPath is a column, the column name will be the last part 
> of the path. If the data is multidimensional, the columns will get a name of 
> <data_type>_col_n . Therefore a column of integers will be called int_col_1.
> h3. Attributes
> Occasionally, HDF5 paths will contain attributes. Drill will map these to a 
> map data structure called attributes, as shown in the query below.
> {{apache drill> SELECT attributes FROM dfs.test.`browsing.h5`;
> +----------------------------------------------------------------------------------+
> |                                    attributes                               
>      |
> +----------------------------------------------------------------------------------+
> | {}                                                                          
>      |
> | {"__TYPE_VARIANT__":"TIMESTAMP_MILLISECONDS_SINCE_START_OF_THE_EPOCH"}      
>      |
> | {}                                                                          
>      |
> | {}                                                                          
>      |
> | 
> {"important":false,"__TYPE_VARIANT__timestamp__":"TIMESTAMP_MILLISECONDS_SINCE_START_OF_THE_EPOCH","timestamp":1550033296762}
>  |
> | {}                                                                          
>      |
> | {}                                                                          
>      |
> | {}                                                                          
>      |
> +----------------------------------------------------------------------------------+
> 8 rows selected (0.292 seconds)}}
> You can access the individual fields within the attributes map by using the 
> structure table.map.key. Note that you will have to give the table an alias 
> for this to work properly.
> {{apache drill> SELECT path, data_type, file_name  
> FROM dfs.test.`browsing.h5` AS t1 WHERE t1.attributes.important = false;
> +---------+-----------+-------------+
> |  path   | data_type |  file_name  |
> +---------+-----------+-------------+
> | /groupB | GROUP     | browsing.h5 |
> +---------+-----------+-------------+}}
> h3. Known Limitations
> h3. 
> There are several limitations with the HDF5 format plugin in Drill.
> * Drill cannot read unsigned 64 bit integers. When the plugin encounters this 
> data type, it will write an INFO message to the log.
> * Drill cannot read compressed fields in HDF5 files.
> * HDF5 files can contain nested data sets of up to n dimensions. Since Drill 
> works best with two dimensional data, datasets with more than two dimensions 
> are flattened.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to