cgivre commented on a change in pull request #1778: Drill-7233: Format Plugin 
for HDF5
URL: https://github.com/apache/drill/pull/1778#discussion_r362265100
 
 

 ##########
 File path: 
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
 ##########
 @@ -0,0 +1,1164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation;
+import ch.systemsx.cisd.hdf5.HDF5DataSetInformation;
+import ch.systemsx.cisd.hdf5.HDF5FactoryProvider;
+import ch.systemsx.cisd.hdf5.HDF5LinkInformation;
+import ch.systemsx.cisd.hdf5.IHDF5Factory;
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5EnumDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private static final Logger logger = 
LoggerFactory.getLogger(HDF5BatchReader.class);
+
+  private static final String PATH_COLUMN_NAME = "path";
+
+  private static final String DATA_TYPE_COLUMN_NAME = "data_type";
+
+  private static final String FILE_NAME_COLUMN_NAME = "file_name";
+
+  private static final String INT_COLUMN_PREFIX = "int_col_";
+
+  private static final String LONG_COLUMN_PREFIX = "long_col_";
+
+  private static final String FLOAT_COLUMN_PREFIX = "float_col_";
+
+  private static final String DOUBLE_COLUMN_PREFIX = "double_col_";
+
+  private static final String INT_COLUMN_NAME = "int_data";
+
+  private static final String FLOAT_COLUMN_NAME = "float_data";
+
+  private static final String DOUBLE_COLUMN_NAME = "double_data";
+
+  private static final String LONG_COLUMN_NAME = "long_data";
+
+  private FileSplit split;
+
+  private IHDF5Reader hdf5Reader;
+
+  private File infile;
+
+  private BufferedReader reader;
+
+  private RowSetLoader rowWriter;
+
+  private Iterator<HDF5DrillMetadata> metadataIterator;
+
+  private final HDF5ReaderConfig readerConfig;
+
+  private boolean deleteTmpDir;
+
+  private ScalarWriter pathWriter;
+
+  private ScalarWriter dataTypeWriter;
+
+  private ScalarWriter fileNameWriter;
+
+  private List<HDF5DataWriter> dataWriters;
+
+  private long[] dimensions;
+
+  public static class HDF5ReaderConfig {
+    final HDF5FormatPlugin plugin;
+
+    final String defaultPath;
+
+    final HDF5FormatConfig formatConfig;
+
+    public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig 
formatConfig) {
+      this.plugin = plugin;
+      this.formatConfig = formatConfig;
+      defaultPath = formatConfig.getDefaultPath();
+    }
+  }
+
+  public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    dataWriters = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    try {
+      openFile(negotiator);
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to close input file: {}", split.getPath())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    ResultSetLoader loader;
+    if (readerConfig.defaultPath == null) {
+      // Get file metadata
+      List<HDF5DrillMetadata> metadata = 
getFileMetadata(hdf5Reader.object().getGroupMemberInformation("/", true), new 
ArrayList<>());
+      metadataIterator = metadata.iterator();
+
+      // Schema for Metadata query
+      SchemaBuilder builder = new 
SchemaBuilder().addNullable(PATH_COLUMN_NAME, 
TypeProtos.MinorType.VARCHAR).addNullable(DATA_TYPE_COLUMN_NAME, 
TypeProtos.MinorType.VARCHAR).addNullable(FILE_NAME_COLUMN_NAME, 
TypeProtos.MinorType.VARCHAR);
+      negotiator.setTableSchema(builder.buildSchema(), false);
+
+      loader = negotiator.build();
+      dimensions = new long[0];
+      rowWriter = loader.writer();
+
+    } else {
+      // This is the case when the default path is specified. Since the user 
is explicitly asking for a dataset
+      // Drill can obtain the schema by getting the datatypes below and 
ultimately mapping that schema to columns
+      HDF5DataSetInformation dsInfo = 
hdf5Reader.object().getDataSetInformation(readerConfig.defaultPath);
+      dimensions = dsInfo.getDimensions();
+
+      loader = negotiator.build();
+      rowWriter = loader.writer();
+      if (dimensions.length <= 1) {
+        buildSchemaFor1DimensionalDataset(dsInfo);
+      } else if (dimensions.length == 2) {
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      } else {
+        // Case for datasets of greater than 2D
+        // These are automatically flattened
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      }
+    }
+    if (readerConfig.defaultPath == null) {
+      pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
+      dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
+      fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+    }
+    return true;
+  }
+
+  /**
+   * This function is called when the default path is set and the data set is 
a single dimension.  This function will create an array of one dataWriter of the
+   * correct datatype
+   * @param dsInfo The HDF5 dataset information
+   */
+  private void buildSchemaFor1DimensionalDataset(HDF5DataSetInformation 
dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add " + 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+
+    switch (currentDataType) {
+      case GENERIC_OBJECT:
+        dataWriters.add(new HDF5EnumDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case VARCHAR:
+        dataWriters.add(new HDF5StringDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case TIMESTAMP:
+        dataWriters.add(new HDF5TimestampDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case INT:
+        dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case BIGINT:
+        dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case FLOAT8:
+        dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case FLOAT4:
+        dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case MAP:
+        dataWriters.add(new HDF5MapDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+    }
+  }
+
+  /**
+   * This function builds a Drill schema from a dataset with 2 or more 
dimensions.  HDF5 only supports INT, LONG, DOUBLE and FLOAT for >2 data types 
so this function is
+   * not as inclusinve as the 1D function.  This function will build the 
schema by adding DataWriters to the dataWriters array.
+   * @param dsInfo The dataset which Drill will use to build a schema
+   */
+
+  private void buildSchemaFor2DimensionalDataset(HDF5DataSetInformation 
dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add {}", 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+    long cols = dimensions[1];
+
+    String tempFieldName;
+    for (int i = 0; i < cols; i++) {
+      switch (currentDataType) {
+        case INT:
+          tempFieldName = INT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case BIGINT:
+          tempFieldName = LONG_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT8:
+          tempFieldName = DOUBLE_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT4:
+          tempFieldName = FLOAT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+      }
+    }
+  }
+  /**
+   * This function contains the logic to open an HDF5 file.
+   * @param negotiator The negotiator represents Drill's interface with the 
file system
+   */
+  private void openFile(FileSchemaNegotiator negotiator) throws IOException {
+    InputStream in = null;
+    try {
+      in = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      IHDF5Factory factory = HDF5FactoryProvider.get();
+      infile = convertInputStreamToFile(in);
+      hdf5Reader = factory.openForReading(infile);
+    } catch (Exception e) {
+      if (in != null) {
+        in.close();
+      }
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: {}", split.getPath())
+        .build(logger);
+    }
+    assert in != null;
+    reader = new BufferedReader(new InputStreamReader(in));
+  }
+
+  /**
+   * This function converts the Drill InputStream into a File object for the 
HDF5 library.  This function
+   * exists due to a known limitation in the HDF5 library which cannot parse 
HDF5 directly from an input stream.  A future
+   * release of the library will support this.
+   *
+   * @param stream The input stream to be converted to a File
+   * @return File The file which was converted from an InputStream
+   * @throws IOException Throws IOException if the InputStream cannot be 
opened or read.
+   */
+  private File convertInputStreamToFile(InputStream stream) throws IOException 
{
+    File tmpDir = getTmpDir();
+    String tempFileName = tmpDir.getPath() + "/~" + split.getPath().getName();
+    File targetFile = new File(tempFileName);
+
+    try {
+      java.nio.file.Files.copy(stream, targetFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+    } catch (Exception e) {
+      if (targetFile.exists()) {
+        targetFile.delete();
+      }
+
+      throw UserException
+        .dataWriteError(e)
+        .message("Failed to create temp HDF5 file: {}", split.getPath())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    IOUtils.closeQuietly(stream);
+    return targetFile;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (readerConfig.defaultPath == null || 
readerConfig.defaultPath.isEmpty()) {
+        if (!metadataIterator.hasNext()){
+          return false;
+        }
+        projectMetadataRow(rowWriter);
+      } else if (dimensions.length <= 1 && dataWriters.get(0).isCompound()) {
+        if (!dataWriters.get(0).hasNext()) {
+          return false;
+        }
+        dataWriters.get(0).write();
+      } else if (dimensions.length <= 1) {
+        // Case for Compound Data Type
+        if (!dataWriters.get(0).hasNext()) {
+          return false;
+        }
+        rowWriter.start();
+        dataWriters.get(0).write();
+        rowWriter.save();
+      } else {
+        int currentRowCount = 0;
+        HDF5DataWriter currentDataWriter;
+        rowWriter.start();
+
+        for (int i = 0; i < dimensions[1]; i++) {
+          currentDataWriter = dataWriters.get(i);
+          currentDataWriter.write();
+          currentRowCount = currentDataWriter.currentRowCount();
+        }
+        rowWriter.save();
+        if (currentRowCount >= dimensions[0]) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This function writes one row of HDF5 metadata.
+   * @param rowWriter The input rowWriter object
+   */
+  private void projectMetadataRow(RowSetLoader rowWriter) {
+    String realFileName = infile.getName().replace("~", "");
+    HDF5DrillMetadata metadataRow = metadataIterator.next();
+    rowWriter.start();
+
+    pathWriter.setString(metadataRow.getPath());
+    dataTypeWriter.setString(metadataRow.getDataType());
+    fileNameWriter.setString(realFileName);
+
+    //Write attributes if present
+    if (metadataRow.getAttributes().size() > 0) {
+      writeAttributes(rowWriter, metadataRow);
+    }
+
+    if (metadataRow.getDataType().equalsIgnoreCase("DATASET")) {
+      projectDataset(rowWriter, metadataRow.getPath());
+    }
+    rowWriter.save();
+  }
+
+  /**
+   * This function gets the file metadata from a given HDF5 file.  It will 
extract the file name the path, and adds any information to the
+   * metadata List.
+   * @param members A list of paths from which the metadata will be extracted
+   * @param metadata the HDF5 metadata object from which the metadata will be 
extracted
+   * @return A list of metadata from the given file paths
+   */
+  private List<HDF5DrillMetadata> getFileMetadata(List<HDF5LinkInformation> 
members, List<HDF5DrillMetadata> metadata) {
+    for (HDF5LinkInformation info : members) {
+      HDF5DrillMetadata metadataRow = new HDF5DrillMetadata();
+
+      metadataRow.setPath(info.getPath());
+      metadataRow.setDataType(info.getType().toString());
+
+      switch (info.getType()) {
+        case DATASET:
+          metadataRow.setAttributes(getAttributes(info.getPath()));
+          metadata.add(metadataRow);
+          break;
+        case SOFT_LINK:
+          // Soft links cannot have attributes
+          metadata.add(metadataRow);
+          break;
+        case GROUP:
+          metadataRow.setAttributes(getAttributes(info.getPath()));
+          metadata.add(metadataRow);
+          metadata = 
getFileMetadata(hdf5Reader.object().getGroupMemberInformation(info.getPath(), 
true), metadata);
+          break;
+        default:
+          break;
+      }
+    }
+    return metadata;
+  }
+
+  /**
+   * Gets the attributes of a HDF5 dataset and returns them into a HashMap
+   *
+   * @param path The path for which you wish to retrieve attributes
+   * @return HashMap The attributes for the given path.  Empty HashMap if no 
attributes present
+   */
+  private HashMap getAttributes(String path) {
+    HashMap<String, HDF5Attribute> attributes = new HashMap<>();
+
+    long attrCount = 
hdf5Reader.object().getObjectInformation(path).getNumberOfAttributes();
+    if (attrCount > 0) {
+      List<String> attrNames = hdf5Reader.object().getAllAttributeNames(path);
+      for (String name : attrNames) {
+        try {
+          HDF5Attribute attribute = HDF5Utils.getAttribute(path, name, 
hdf5Reader);
+          attributes.put(attribute.getKey(), attribute);
+        } catch (Exception e) {
+          logger.info("Couldn't add attribute: {} {}", path, name);
+        }
+      }
+    }
+    return attributes;
+  }
+
+  /**
+   * This function writes one row of data in a metadata query.  The number of 
dimensions here is n+1.  So if the actual dataset is a 1D column, it will be 
written as a list.
+   * This is function is only called in metadata queries as the schema is not 
known in advance.
+   *
+   * @param rowWriter The rowWriter to which the data will be written
+   * @param datapath The datapath from which the data will be read
+   * @return True if the dataset was successfully projected.  False if there 
is no more data
+   */
+  private boolean projectDataset(RowSetLoader rowWriter, String datapath) {
+    String fieldName = HDF5Utils.getNameFromPath(datapath);
+    IHDF5Reader reader = hdf5Reader;
+    HDF5DataSetInformation dsInfo = 
reader.object().getDataSetInformation(datapath);
+    long[] dimensions = dsInfo.getDimensions();
+    //Case for single dimensional data
+    if (dimensions.length <= 1) {
+      TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+      // Case for null or unknown data types:
+      if (currentDataType == null) {
+        logger.info("Couldn't add " + 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      }
+
+      assert currentDataType != null;
+      switch (currentDataType) {
+        case GENERIC_OBJECT:
+          logger.warn("Couldn't read {}", datapath );
+          break;
+        case VARCHAR:
+          String[] data = hdf5Reader.readStringArray(datapath);
+          writeStringListColumn(rowWriter, fieldName, data);
+          break;
+        case TIMESTAMP:
+          long[] longList = hdf5Reader.time().readTimeStampArray(datapath);
+          writeTimestampListColumn(rowWriter, fieldName, longList);
+          break;
+        case INT:
+          if (!dsInfo.getTypeInformation().isSigned()) {
+            if (dsInfo.getTypeInformation().getElementSize() > 4) {
+                longList = hdf5Reader.uint64().readArray(datapath);
+                writeLongListColumn(rowWriter, fieldName, longList);
+            }
+          } else {
+            int[] intList = hdf5Reader.readIntArray(datapath);
+            writeIntListColumn(rowWriter, fieldName, intList);
+          }
+          break;
+        case FLOAT4:
+          float[] tempFloatList = hdf5Reader.readFloatArray(datapath);
+          writeFloat4ListColumn(rowWriter, fieldName, tempFloatList);
+          break;
+        case FLOAT8:
+          double[] tempDoubleList = hdf5Reader.readDoubleArray(datapath);
+          writeFloat8ListColumn(rowWriter, fieldName, tempDoubleList);
+          break;
+        case BIGINT:
+          if (!dsInfo.getTypeInformation().isSigned()) {
+            logger.warn("Drill does not support unsigned 64bit integers.");
+            break;
+          }
+          long[] tempBigIntList = hdf5Reader.readLongArray(datapath);
+          writeLongListColumn(rowWriter, fieldName, tempBigIntList);
+          break;
+        case MAP:
+          try {
+            getAndMapCompoundData(datapath, new ArrayList<>(), hdf5Reader, 
rowWriter);
+          } catch (Exception e) {
+            throw UserException
+              .dataReadError()
+              .addContext("Error writing Compound Field: ")
+              .addContext(e.getMessage())
+              .build(logger);
+          }
+          break;
+
+        default:
+          // Case for data types that cannot be read
+          logger.warn(dsInfo.getTypeInformation().tryGetJavaType() + " not 
implemented....yet.");
+          break;
+      }
+    } else if (dimensions.length == 2) {
+      // Case for 2D data sets.  These are projected as lists of lists or maps 
of maps
+      long cols = dimensions[1];
+      long rows = dimensions[0];
+      switch (HDF5Utils.getDataType(dsInfo)) {
+        case INT:
+          int[][] colData = hdf5Reader.readIntMatrix(datapath);
+          mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT4:
+          float[][] floatData = hdf5Reader.readFloatMatrix(datapath);
+          mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT8:
+          double[][] doubleData = hdf5Reader.readDoubleMatrix(datapath);
+          mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
+          break;
+        case BIGINT:
+          long[][] longData = hdf5Reader.readLongMatrix(datapath);
+          mapBigintMatrixField(longData, (int) cols, (int) rows, rowWriter);
+          break;
+        default:
+          logger.info(HDF5Utils.getDataType(dsInfo) + " not implemented.");
+          break;
+      }
+    } else {
+      // Case for data sets with dimensions > 2
+      long cols = dimensions[1];
+      long rows = dimensions[0];
+      switch (HDF5Utils.getDataType(dsInfo)) {
+        case INT:
+          int[][] colData = 
hdf5Reader.int32().readMDArray(datapath).toMatrix();
+          mapIntMatrixField(colData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT4:
+          float[][] floatData = 
hdf5Reader.float32().readMDArray(datapath).toMatrix();
+          mapFloatMatrixField(floatData, (int) cols, (int) rows, rowWriter);
+          break;
+        case FLOAT8:
+          double[][] doubleData = 
hdf5Reader.float64().readMDArray(datapath).toMatrix();
+          mapDoubleMatrixField(doubleData, (int) cols, (int) rows, rowWriter);
+          break;
+        case BIGINT:
+          long[][] longData = 
hdf5Reader.int64().readMDArray(datapath).toMatrix();
+          mapBigintMatrixField(longData, (int) cols, (int) rows, rowWriter);
+          break;
+        default:
+          logger.info(HDF5Utils.getDataType(dsInfo) + " not implemented.");
+          break;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper function to write a 1D boolean column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeBooleanColumn(TupleWriter rowWriter, String name, int 
value) {
+    writeBooleanColumn(rowWriter, name, value != 0);
+  }
+
+  /**
+   * Helper function to write a 1D boolean column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeBooleanColumn(TupleWriter rowWriter, String name, boolean 
value) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.BIT, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    ScalarWriter colWriter = rowWriter.scalar(index);
+    colWriter.setBoolean(value);
+  }
+
+  /**
+   * Helper function to write a 1D int column
+   *
+   * @param rowWriter The row to which the data will be written
+   * @param name The column name
+   * @param value The value to be written
+   */
+  private void writeIntColumn(TupleWriter rowWriter, String name, int value) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL);
+      index = rowWriter.addColumn(colSchema);
+    }
+    ScalarWriter colWriter = rowWriter.scalar(index);
+    colWriter.setInt(value);
+  }
+
+
+  /**
+   * Helper function to write a 2D int list
+   * @param rowWriter the row to which the data will be written
+   * @param name the name of the outer list
+   * @param list the list of data
+   */
+  private void writeIntListColumn(TupleWriter rowWriter, String name, int[] 
list) {
+    int index = rowWriter.tupleSchema().index(name);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(name, 
TypeProtos.MinorType.INT, TypeProtos.DataMode.REPEATED);
+      index = rowWriter.addColumn(colSchema);
+    }
+
+    ScalarWriter arrayWriter = rowWriter.column(index).array().scalar();
+    for (int value : list) {
+      arrayWriter.setInt(value);
+    }
+  }
+
+
+  private void mapIntMatrixField(int[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // If the default path is not null, auto flatten the data
+    // The end result are that a 2D array gets mapped to Drill columns
+    if (readerConfig.defaultPath != null) {
+      for (int i = 0; i < rows; i++) {
+        rowWriter.start();
+        for (int k = 0; k < cols; k++) {
+          String tempColumnName = INT_COLUMN_PREFIX + k;
+          writeIntColumn(rowWriter, tempColumnName, colData[i][k]);
+        }
+        rowWriter.save();
+      }
+    } else {
+      intMatrixHelper(colData, cols, rows, rowWriter);
+    }
+  }
+
+  private void intMatrixHelper(int[][] colData, int cols, int rows, 
RowSetLoader rowWriter) {
+    // This is the case where a dataset is projected in a metadata query.  The 
result should be a list of lists
+
+    final TupleMetadata nestedSchema = new 
SchemaBuilder().addRepeatedList(INT_COLUMN_NAME).addArray(TypeProtos.MinorType.INT).resumeSchema().buildSchema();
 
 Review comment:
   Removed final

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to