cgivre commented on a change in pull request #1778: Drill-7233: Format Plugin 
for HDF5
URL: https://github.com/apache/drill/pull/1778#discussion_r362264460
 
 

 ##########
 File path: 
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java
 ##########
 @@ -0,0 +1,1164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.hdf5;
+
+import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation;
+import ch.systemsx.cisd.hdf5.HDF5DataSetInformation;
+import ch.systemsx.cisd.hdf5.HDF5FactoryProvider;
+import ch.systemsx.cisd.hdf5.HDF5LinkInformation;
+import ch.systemsx.cisd.hdf5.IHDF5Factory;
+import ch.systemsx.cisd.hdf5.IHDF5Reader;
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapBuilder;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5EnumDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter;
+import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+import org.apache.drill.shaded.guava.com.google.common.io.Files;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private static final Logger logger = 
LoggerFactory.getLogger(HDF5BatchReader.class);
+
+  private static final String PATH_COLUMN_NAME = "path";
+
+  private static final String DATA_TYPE_COLUMN_NAME = "data_type";
+
+  private static final String FILE_NAME_COLUMN_NAME = "file_name";
+
+  private static final String INT_COLUMN_PREFIX = "int_col_";
+
+  private static final String LONG_COLUMN_PREFIX = "long_col_";
+
+  private static final String FLOAT_COLUMN_PREFIX = "float_col_";
+
+  private static final String DOUBLE_COLUMN_PREFIX = "double_col_";
+
+  private static final String INT_COLUMN_NAME = "int_data";
+
+  private static final String FLOAT_COLUMN_NAME = "float_data";
+
+  private static final String DOUBLE_COLUMN_NAME = "double_data";
+
+  private static final String LONG_COLUMN_NAME = "long_data";
+
+  private FileSplit split;
+
+  private IHDF5Reader hdf5Reader;
+
+  private File infile;
+
+  private BufferedReader reader;
+
+  private RowSetLoader rowWriter;
+
+  private Iterator<HDF5DrillMetadata> metadataIterator;
+
+  private final HDF5ReaderConfig readerConfig;
+
+  private boolean deleteTmpDir;
+
+  private ScalarWriter pathWriter;
+
+  private ScalarWriter dataTypeWriter;
+
+  private ScalarWriter fileNameWriter;
+
+  private List<HDF5DataWriter> dataWriters;
+
+  private long[] dimensions;
+
+  public static class HDF5ReaderConfig {
+    final HDF5FormatPlugin plugin;
+
+    final String defaultPath;
+
+    final HDF5FormatConfig formatConfig;
+
+    public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig 
formatConfig) {
+      this.plugin = plugin;
+      this.formatConfig = formatConfig;
+      defaultPath = formatConfig.getDefaultPath();
+    }
+  }
+
+  public HDF5BatchReader(HDF5ReaderConfig readerConfig) {
+    this.readerConfig = readerConfig;
+    dataWriters = new ArrayList<>();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    try {
+      openFile(negotiator);
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to close input file: {}", split.getPath())
+        .message(e.getMessage())
+        .build(logger);
+    }
+
+    ResultSetLoader loader;
+    if (readerConfig.defaultPath == null) {
+      // Get file metadata
+      List<HDF5DrillMetadata> metadata = 
getFileMetadata(hdf5Reader.object().getGroupMemberInformation("/", true), new 
ArrayList<>());
+      metadataIterator = metadata.iterator();
+
+      // Schema for Metadata query
+      SchemaBuilder builder = new 
SchemaBuilder().addNullable(PATH_COLUMN_NAME, 
TypeProtos.MinorType.VARCHAR).addNullable(DATA_TYPE_COLUMN_NAME, 
TypeProtos.MinorType.VARCHAR).addNullable(FILE_NAME_COLUMN_NAME, 
TypeProtos.MinorType.VARCHAR);
+      negotiator.setTableSchema(builder.buildSchema(), false);
+
+      loader = negotiator.build();
+      dimensions = new long[0];
+      rowWriter = loader.writer();
+
+    } else {
+      // This is the case when the default path is specified. Since the user 
is explicitly asking for a dataset
+      // Drill can obtain the schema by getting the datatypes below and 
ultimately mapping that schema to columns
+      HDF5DataSetInformation dsInfo = 
hdf5Reader.object().getDataSetInformation(readerConfig.defaultPath);
+      dimensions = dsInfo.getDimensions();
+
+      loader = negotiator.build();
+      rowWriter = loader.writer();
+      if (dimensions.length <= 1) {
+        buildSchemaFor1DimensionalDataset(dsInfo);
+      } else if (dimensions.length == 2) {
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      } else {
+        // Case for datasets of greater than 2D
+        // These are automatically flattened
+        buildSchemaFor2DimensionalDataset(dsInfo);
+      }
+    }
+    if (readerConfig.defaultPath == null) {
+      pathWriter = rowWriter.scalar(PATH_COLUMN_NAME);
+      dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME);
+      fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME);
+    }
+    return true;
+  }
+
+  /**
+   * This function is called when the default path is set and the data set is 
a single dimension.  This function will create an array of one dataWriter of the
+   * correct datatype
+   * @param dsInfo The HDF5 dataset information
+   */
+  private void buildSchemaFor1DimensionalDataset(HDF5DataSetInformation 
dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add " + 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+
+    switch (currentDataType) {
+      case GENERIC_OBJECT:
+        dataWriters.add(new HDF5EnumDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case VARCHAR:
+        dataWriters.add(new HDF5StringDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case TIMESTAMP:
+        dataWriters.add(new HDF5TimestampDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case INT:
+        dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case BIGINT:
+        dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case FLOAT8:
+        dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case FLOAT4:
+        dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+      case MAP:
+        dataWriters.add(new HDF5MapDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath));
+        break;
+    }
+  }
+
+  /**
+   * This function builds a Drill schema from a dataset with 2 or more 
dimensions.  HDF5 only supports INT, LONG, DOUBLE and FLOAT for >2 data types 
so this function is
+   * not as inclusinve as the 1D function.  This function will build the 
schema by adding DataWriters to the dataWriters array.
+   * @param dsInfo The dataset which Drill will use to build a schema
+   */
+
+  private void buildSchemaFor2DimensionalDataset(HDF5DataSetInformation 
dsInfo) {
+    TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo);
+    // Case for null or unknown data types:
+    if (currentDataType == null) {
+      logger.warn("Couldn't add {}", 
dsInfo.getTypeInformation().tryGetJavaType().toGenericString());
+      return;
+    }
+    long cols = dimensions[1];
+
+    String tempFieldName;
+    for (int i = 0; i < cols; i++) {
+      switch (currentDataType) {
+        case INT:
+          tempFieldName = INT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case BIGINT:
+          tempFieldName = LONG_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT8:
+          tempFieldName = DOUBLE_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+        case FLOAT4:
+          tempFieldName = FLOAT_COLUMN_PREFIX + i;
+          dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, 
readerConfig.defaultPath, tempFieldName, i));
+          break;
+      }
+    }
+  }
+  /**
+   * This function contains the logic to open an HDF5 file.
+   * @param negotiator The negotiator represents Drill's interface with the 
file system
+   */
+  private void openFile(FileSchemaNegotiator negotiator) throws IOException {
+    InputStream in = null;
+    try {
+      in = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      IHDF5Factory factory = HDF5FactoryProvider.get();
+      infile = convertInputStreamToFile(in);
+      hdf5Reader = factory.openForReading(infile);
+    } catch (Exception e) {
+      if (in != null) {
+        in.close();
+      }
+      throw UserException
+        .dataReadError(e)
+        .message("Failed to open open input file: {}", split.getPath())
+        .build(logger);
+    }
+    assert in != null;
+    reader = new BufferedReader(new InputStreamReader(in));
+  }
+
+  /**
+   * This function converts the Drill InputStream into a File object for the 
HDF5 library.  This function
+   * exists due to a known limitation in the HDF5 library which cannot parse 
HDF5 directly from an input stream.  A future
+   * release of the library will support this.
+   *
+   * @param stream The input stream to be converted to a File
+   * @return File The file which was converted from an InputStream
+   * @throws IOException Throws IOException if the InputStream cannot be 
opened or read.
+   */
+  private File convertInputStreamToFile(InputStream stream) throws IOException 
{
+    File tmpDir = getTmpDir();
+    String tempFileName = tmpDir.getPath() + "/~" + split.getPath().getName();
+    File targetFile = new File(tempFileName);
+
+    try {
+      java.nio.file.Files.copy(stream, targetFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+    } catch (Exception e) {
+      if (targetFile.exists()) {
+        targetFile.delete();
+      }
+
+      throw UserException
+        .dataWriteError(e)
+        .message("Failed to create temp HDF5 file: {}", split.getPath())
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to