paul-rogers commented on a change in pull request #1778: Drill-7233: Format Plugin for HDF5 URL: https://github.com/apache/drill/pull/1778#discussion_r347115978
########## File path: contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java ########## @@ -0,0 +1,1208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.hdf5; + +import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation; +import ch.systemsx.cisd.hdf5.HDF5DataClass; +import ch.systemsx.cisd.hdf5.HDF5DataSetInformation; +import ch.systemsx.cisd.hdf5.HDF5FactoryProvider; +import ch.systemsx.cisd.hdf5.HDF5LinkInformation; +import ch.systemsx.cisd.hdf5.IHDF5Factory; +import ch.systemsx.cisd.hdf5.IHDF5Reader; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MapBuilder; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5EnumDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter; +import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter; + +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.drill.shaded.guava.com.google.common.io.Files; +import org.apache.hadoop.mapred.FileSplit; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> { + private static final Logger logger = LoggerFactory.getLogger(HDF5BatchReader.class); + + private static final String PATH_COLUMN_NAME = "path"; + + private static final String DATA_TYPE_COLUMN_NAME = "data_type"; + + private static final String FILE_NAME_COLUMN_NAME = "file_name"; + + private static final String INT_COLUMN_PREFIX = "int_col_"; + + private static final String LONG_COLUMN_PREFIX = "long_col_"; + + private static final String FLOAT_COLUMN_PREFIX = "float_col_"; + + private static final String DOUBLE_COLUMN_PREFIX = "double_col_"; + + private static final String INT_COLUMN_NAME = "int_data"; + + private static final String FLOAT_COLUMN_NAME = "float_data"; + + private static final String DOUBLE_COLUMN_NAME = "double_data"; + + private static final String LONG_COLUMN_NAME = "long_data"; + + private static final String STRING_COLUMN_NAME = "string_data"; + + private static final String BOOLEAN_COLUMN_NAME = "boolean_data"; + + private FileSplit split; + + private HDF5FormatConfig formatConfig; + + private ResultSetLoader loader; + + private String tempFileName; + + private IHDF5Reader hdf5Reader; + + private File infile; + + private SchemaBuilder builder; + + private BufferedReader reader; + + private RowSetLoader rowWriter; + + private Iterator<HDF5DrillMetadata> metadataIterator; + + private final HDF5ReaderConfig readerConfig; + + private List<HDF5DrillMetadata> metadata; + + private boolean deleteTmpDir; + + private ScalarWriter pathWriter; + + private ScalarWriter dataTypeWriter; + + private ScalarWriter fileNameWriter; + + private List<HDF5DataWriter> dataWriters; + + private HDF5DataSetInformation dsInfo; + + private HDF5DataClass dataType; + + private long[] dimensions; + + public static class HDF5ReaderConfig { + final HDF5FormatPlugin plugin; + + final String defaultPath; + + final HDF5FormatConfig formatConfig; + + public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) { + this.plugin = plugin; + this.formatConfig = formatConfig; + defaultPath = formatConfig.getDefaultPath(); + } + } + + + public HDF5BatchReader(HDF5ReaderConfig readerConfig) { + this.readerConfig = readerConfig; + formatConfig = readerConfig.formatConfig; + dataWriters = new ArrayList<>(); + } + + @Override + public boolean open(FileSchemaNegotiator negotiator) { + split = negotiator.split(); + openFile(negotiator); + + if (readerConfig.defaultPath == null) { + // Get file metadata + metadata = getFileMetadata(hdf5Reader.object().getGroupMemberInformation("/", true), new ArrayList<>()); + metadataIterator = metadata.iterator(); + + // Schema for Metadata query + builder = new SchemaBuilder() + .addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR) + .addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR) + .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR); + negotiator.setTableSchema(builder.buildSchema(), false); + + loader = negotiator.build(); + dimensions = new long[0]; + rowWriter = loader.writer(); + + } else { + // This is the case when the default path is specified. Since the user is explicitly asking for a dataset + // Drill can obtain the schema by getting the datatypes below and ultimately mapping that schema to columns + dsInfo = hdf5Reader.object().getDataSetInformation(readerConfig.defaultPath); + dataType = dsInfo.getTypeInformation().getRawDataClass(); + dimensions = dsInfo.getDimensions(); + + loader = negotiator.build(); + rowWriter = loader.writer(); + if (dimensions.length <= 1) { + buildSchemaFor1DimensionalDataset(dsInfo); + } else if (dimensions.length == 2) { + buildSchemaFor2DimensionalDataset(dsInfo); + } else { + // Case for datasets of greater than 2D + // These are automatically flattened + buildSchemaFor2DimensionalDataset(dsInfo); + } + } + if (readerConfig.defaultPath == null) { + pathWriter = rowWriter.scalar(PATH_COLUMN_NAME); + dataTypeWriter = rowWriter.scalar(DATA_TYPE_COLUMN_NAME); + fileNameWriter = rowWriter.scalar(FILE_NAME_COLUMN_NAME); + } + return true; + } + + /** + * This function is called when the default path is set and the data set is a single dimension. This function will create an array of one dataWriter of the + * correct datatype + * @param dsInfo The HDF5 dataset information + */ + private void buildSchemaFor1DimensionalDataset(HDF5DataSetInformation dsInfo) { + TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo); + + // Case for null or unknown data types: + if (currentDataType == null) { + logger.warn("Couldn't add " + dsInfo.getTypeInformation().tryGetJavaType().toGenericString()); + return; + } + + switch (currentDataType) { + case GENERIC_OBJECT: + dataWriters.add(new HDF5EnumDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + case VARCHAR: + dataWriters.add(new HDF5StringDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + case TIMESTAMP: + dataWriters.add(new HDF5TimestampDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + case INT: + dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + case BIGINT: + dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + case FLOAT8: + dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + case FLOAT4: + dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + case MAP: + dataWriters.add(new HDF5MapDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath)); + break; + } + } + + /** + * This function builds a Drill schema from a dataset with 2 or more dimensions. HDF5 only supports INT, LONG, DOUBLE and FLOAT for >2 data types so this function is + * not as inclusinve as the 1D function. This function will build the schema by adding DataWriters to the dataWriters array. + * @param dsInfo The dataset which Drill will use to build a schema + */ + + private void buildSchemaFor2DimensionalDataset(HDF5DataSetInformation dsInfo) { + TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo); + // Case for null or unknown data types: + if (currentDataType == null) { + logger.warn("Couldn't add " + dsInfo.getTypeInformation().tryGetJavaType().toGenericString()); + return; + } + long cols = dimensions[1]; + long rows = dimensions[0]; + + String tempFieldName; + for (int i = 0; i < cols; i++) { + switch (currentDataType) { + case INT: + tempFieldName = INT_COLUMN_PREFIX + i; + dataWriters.add(new HDF5IntDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i)); + break; + case BIGINT: + tempFieldName = LONG_COLUMN_PREFIX + i; + dataWriters.add(new HDF5LongDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i)); + break; + case FLOAT8: + tempFieldName = DOUBLE_COLUMN_PREFIX + i; + dataWriters.add(new HDF5DoubleDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i)); + break; + case FLOAT4: + tempFieldName = FLOAT_COLUMN_PREFIX + i; + dataWriters.add(new HDF5FloatDataWriter(hdf5Reader, rowWriter, readerConfig.defaultPath, tempFieldName, i)); + break; + } + } + } + /** + * This function contains the logic to open an HDF5 file. + * @param negotiator The negotiator represents Drill's interface with the file system + */ + private void openFile(FileSchemaNegotiator negotiator) { + InputStream in = null; + try { + in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath()); + IHDF5Factory factory = HDF5FactoryProvider.get(); + infile = convertInputStreamToFile(in); + hdf5Reader = factory.openForReading(infile); + } catch (Exception e) { + throw UserException + .dataReadError(e) + .message("Failed to open open input file: %s", split.getPath()) + .build(logger); + } + reader = new BufferedReader(new InputStreamReader(in)); + } + + /** + * This function converts the Drill InputStream into a File object for the HDF5 library. This function + * exists due to a known limitation in the HDF5 library which cannot parse HDF5 directly from an input stream. A future + * release of the library will support this. + * + * @param stream The input stream to be converted to a File + * @return File The file which was converted from an InputStream + * @throws IOException Throws IOException if the InputStream cannot be opened or read. + */ + private File convertInputStreamToFile(InputStream stream) throws IOException { + //this.tempFileName = "./~" + split.getPath().getName(); + File tmpDir = getTmpDir(); + this.tempFileName = tmpDir.getPath() + "/~" + split.getPath().getName(); + File targetFile = new File(tempFileName); + java.nio.file.Files.copy(stream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + IOUtils.closeQuietly(stream); + return targetFile; + } + + @Override + public boolean next() { + while (!rowWriter.isFull()) { + if (readerConfig.defaultPath == null || readerConfig.defaultPath.isEmpty()) { + if (!metadataIterator.hasNext()){ + return false; + } + projectMetadataRow(rowWriter); + } else if (dimensions.length <= 1 && dataWriters.get(0).isCompound()) { + if (!dataWriters.get(0).hasNext()) { + return false; + } + dataWriters.get(0).write(); + } else if (dimensions.length <= 1) { + // Case for Compound Data Type + if (!dataWriters.get(0).hasNext()) { + return false; + } + rowWriter.start(); + dataWriters.get(0).write(); + rowWriter.save(); + } else { + int currentRowCount = 0; + HDF5DataWriter currentDataWriter; + rowWriter.start(); + + for (int i = 0; i < dimensions[1]; i++) { + currentDataWriter = dataWriters.get(i); + currentDataWriter.write(); + currentRowCount = currentDataWriter.currentRowCount(); + } + rowWriter.save(); + if (currentRowCount >= dimensions[0]) { + return false; + } + } + } + return true; + } + + /** + * This function writes one row of HDF5 metadata. + * @param rowWriter The input rowWriter object + * @return boolean Returns true when the file has more data, false when not. + */ + private boolean projectMetadataRow(RowSetLoader rowWriter) { + String realFileName = infile.getName().replace("~", ""); + HDF5DrillMetadata metadataRow = metadataIterator.next(); + rowWriter.start(); + + pathWriter.setString(metadataRow.getPath()); + dataTypeWriter.setString(metadataRow.getDataType()); + fileNameWriter.setString(realFileName); + + //Write attributes if present + if (metadataRow.getAttributes().size() > 0) { + writeAttributes(rowWriter, metadataRow); + } + + if (metadataRow.getDataType().equalsIgnoreCase("DATASET")) { + projectDataset(rowWriter, metadataRow.getPath()); + } + rowWriter.save(); + return !metadataIterator.hasNext(); + } + + /** + * This helper function returns the name of a HDF5 record from a data path + * + * @param path Path to HDF5 data + * @return String name of data + */ + private String getNameFromPath(String path) { + String pattern = "/*.*/(.+?)$"; + Pattern r = Pattern.compile(pattern); Review comment: Nit: if this is done frequently, you can create a `static final` that holds the pattern. I believe that patterns are thread safe (though matchers are not). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services