paul-rogers commented on a change in pull request #1778: Drill-7233: Format Plugin for HDF5 URL: https://github.com/apache/drill/pull/1778#discussion_r331769363
########## File path: contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java ########## @@ -0,0 +1,887 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.hdf5; + +import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation; +import ch.systemsx.cisd.hdf5.HDF5DataClass; +import ch.systemsx.cisd.hdf5.HDF5DataSetInformation; +import ch.systemsx.cisd.hdf5.HDF5FactoryProvider; +import ch.systemsx.cisd.hdf5.HDF5LinkInformation; +import ch.systemsx.cisd.hdf5.IHDF5Factory; +import ch.systemsx.cisd.hdf5.IHDF5Reader; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.hadoop.mapred.FileSplit; +import org.joda.time.Instant; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HDF5BatchReader.class); + private FileSplit split; + private HDF5FormatConfig formatConfig; + private ResultSetLoader loader; + private String tempFileName; + private IHDF5Reader HDF5reader; + private File infile; + private BufferedReader reader; + protected HDF5ReaderConfig readerConfig; + private boolean finish; + + + public static class HDF5ReaderConfig { + protected final HDF5FormatPlugin plugin; + protected TupleMetadata schema; + protected String defaultPath; + protected HDF5FormatConfig formatConfig; + + public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) { + this.plugin = plugin; + this.formatConfig = formatConfig; + this.defaultPath = formatConfig.getDefaultPath(); + } + } + + + public HDF5BatchReader(HDF5ReaderConfig readerConfig) { + this.readerConfig = readerConfig; + this.formatConfig = readerConfig.formatConfig; + } + + @Override + public boolean open(FileSchemaNegotiator negotiator) { + split = negotiator.split(); + loader = negotiator.build(); + openFile(negotiator); + this.loader = negotiator.build(); + return true; + } + + private void openFile(FileSchemaNegotiator negotiator) { + InputStream in; + try { + in = negotiator.fileSystem().open(split.getPath()); + IHDF5Factory factory = HDF5FactoryProvider.get(); + this.infile = convertInputStreamToFile(in); + this.HDF5reader = factory.openForReading(infile); + } catch (Exception e) { + throw UserException + .dataReadError(e) + .message("Failed to open open input file: %s", split.getPath()) + .build(logger); + } + reader = new BufferedReader(new InputStreamReader(in)); + } + + /** + * This function converts the Drill inputstream into a File object for the HDF5 library. This function + * exists due to a known limitation in the HDF5 library which cannot parse HDF5 directly from an input stream. A future + * release of the library will support this. + * + * @param stream + * @return + * @throws IOException + */ + private File convertInputStreamToFile(InputStream stream) throws IOException { + this.tempFileName = "./~" + split.getPath().getName(); + File targetFile = new File(tempFileName); + java.nio.file.Files.copy(stream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + IOUtils.closeQuietly(stream); + return targetFile; + } + + @Override + public boolean next() { + RowSetLoader rowWriter = loader.writer(); + while (!rowWriter.isFull()) { + if (this.formatConfig.getDefaultPath() == null || this.formatConfig.getDefaultPath().isEmpty()) { + return projectFileMetadata(rowWriter); + } else { + return projectDataset(rowWriter, readerConfig.defaultPath, false); + } + + } + return false; + } + + private boolean projectFileMetadata(RowSetLoader rowWriter) { + List<HDF5DrillMetadata> metadata = getFileMetadata(this.HDF5reader.object().getGroupMemberInformation("/", true), new ArrayList<>()); + + for (HDF5DrillMetadata record : metadata) { + rowWriter.start(); + writeStringColumn(rowWriter, "path", record.getPath()); + writeStringColumn(rowWriter, "data_type", record.getDataType()); + writeStringColumn(rowWriter, "file_name",this.infile.getName().replace("~", "") ); + + //Write attributes if present + if (record.getAttributes().size() > 0) { + writeAttributes(rowWriter, record); + } + if (record.getDataType().equals("DATASET")) { + if (readerConfig.defaultPath != null) { + projectDataset(rowWriter, readerConfig.defaultPath, true); + } else { + projectDataset(rowWriter, record.getPath(), true); + } + } + rowWriter.save(); + } + return false; + } + + /** + * This helper function returns the name of a HDF5 record from a data path + * @param path Path to HDF5 data + * @return String name of data + */ + private String getNameFromPath(String path) { + String pattern = "/*.*/(.+?)$"; + Pattern r = Pattern.compile(pattern); + // Now create matcher object. + Matcher m = r.matcher(path); + if (m.find()) { + return m.group(1); + } else { + return ""; + } + } + + private List<HDF5DrillMetadata> getFileMetadata(List<HDF5LinkInformation> members, List<HDF5DrillMetadata> metadata) { + for (HDF5LinkInformation info : members) { + HDF5DrillMetadata metadataRow = new HDF5DrillMetadata(); + + metadataRow.setPath(info.getPath()); + metadataRow.setDataType(info.getType().toString()); + + switch (info.getType()) { + case DATASET: + metadataRow.setAttributes(getAttributes(HDF5reader, info.getPath())); + HDF5DataSetInformation dsInfo = HDF5reader.object().getDataSetInformation(info.getPath()); + metadata.add(metadataRow); + break; + case SOFT_LINK: + // Soft links cannot have attributes + metadata.add(metadataRow); + break; + case GROUP: + metadataRow.setAttributes(getAttributes(HDF5reader, info.getPath())); + metadata.add(metadataRow); + metadata = getFileMetadata(HDF5reader.object().getGroupMemberInformation(info.getPath(), true), metadata); + break; + default: + break; + } + } + return metadata; + } + + private HashMap getAttributes(IHDF5Reader reader, String path) { + HashMap<String, HDF5Attribute> attributes = new HashMap<>(); + long attrCount = reader.object().getObjectInformation(path).getNumberOfAttributes(); + if (attrCount > 0) { + List<String> attrNames = reader.object().getAllAttributeNames(path); + for (String name : attrNames) { + try { + HDF5Attribute attribute = HDF5Utils.getAttribute(path, name, reader); + attributes.put(attribute.getKey(), attribute); + } catch (Exception e) { + logger.info("Couldn't add attribute: " + path + " " + name); + } + } + } + return attributes; + } + + + private boolean projectDataset(RowSetLoader rowWriter, String datapath, boolean isMetadataQuery) { + int resultCount = 0; + + String fieldName = getNameFromPath(datapath); + IHDF5Reader reader = this.HDF5reader; + HDF5DataSetInformation dsInfo = reader.object().getDataSetInformation(datapath); + HDF5DataClass dataType = dsInfo.getTypeInformation().getRawDataClass(); + long[] dimensions = dsInfo.getDimensions(); + //Case for single dimensional data + if (dimensions.length <= 1) { + TypeProtos.MinorType currentDataType = HDF5Utils.getDataType(dsInfo); + // Case for null or unknown data types: + if (currentDataType == null) { + System.out.println(dsInfo.getTypeInformation().tryGetJavaType()); + } + + switch (currentDataType) { + case GENERIC_OBJECT: + ArrayList enumData = new ArrayList(Arrays.asList(HDF5reader.readEnumArray(datapath).toStringArray())); + break; + case VARCHAR: + try { + if (readerConfig.defaultPath != null) { + String[] tempStringList = HDF5reader.readStringArray(datapath); + for (String value : tempStringList) { + rowWriter.start(); + writeStringColumn(rowWriter, fieldName, value); + rowWriter.save(); + } + } else { + String[] data = HDF5reader.readStringArray(datapath); + writeStringListColumn(rowWriter, fieldName, data); + } + } catch (Exception e) { + logger.warn("Unknown HDF5 data type: " + datapath); + } + break; + case TIMESTAMP: + long ts = HDF5reader.readTimeStamp(datapath); + writeTimestampColumn(rowWriter, fieldName, ts); + break; + case INT: + if (!dsInfo.getTypeInformation().isSigned()) { + if (dsInfo.getTypeInformation().getElementSize() > 4) { + if (readerConfig.defaultPath != null) { + long[] tempLongList = HDF5reader.readLongArray(datapath); + for (long i : tempLongList) { + rowWriter.start(); + writeLongColumn(rowWriter, fieldName, i); + rowWriter.save(); + } + } else { + long[] longList = HDF5reader.uint64().readArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeLongListColumn(rowWriter, fieldName, longList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + } + } else { + if (readerConfig.defaultPath != null) { + int[] tempIntList = HDF5reader.readIntArray(datapath); + for (int i : tempIntList) { + rowWriter.start(); + writeIntColumn(rowWriter, fieldName, i); + rowWriter.save(); + } + + } else { + int[] intList = HDF5reader.readIntArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeIntListColumn(rowWriter, fieldName, intList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + } + break; + case FLOAT4: + if (readerConfig.defaultPath != null) { + float[] tempFloatList = HDF5reader.readFloatArray(datapath); + for (float i : tempFloatList) { + rowWriter.start(); + writeFloat4Column(rowWriter, fieldName, i); + rowWriter.save(); + } + } else { + float[] tempFloatList = HDF5reader.readFloatArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeFloat4ListColumn(rowWriter, fieldName, tempFloatList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + break; + case FLOAT8: // TODO Add auto-flatten here... + if (readerConfig.defaultPath != null) { + double[] tempDoubleList = HDF5reader.readDoubleArray(datapath); + for (double i : tempDoubleList) { + rowWriter.start(); + writeFloat8Column(rowWriter, fieldName, i); + rowWriter.save(); + } + } else { + double[] tempFloatList = HDF5reader.readDoubleArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeFloat8ListColumn(rowWriter, fieldName, tempFloatList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + break; + case BIGINT: + if (!dsInfo.getTypeInformation().isSigned()) { + logger.warn("Drill does not support unsigned 64bit integers."); + break; + } + if (readerConfig.defaultPath != null) { + long[] tempLongList = HDF5reader.readLongArray(datapath); + for (long i : tempLongList) { + rowWriter.start(); + writeLongColumn(rowWriter, fieldName, i); + rowWriter.save(); + } + + } else { + long[] tempLongList = HDF5reader.readLongArray(datapath); + if(!isMetadataQuery) { + rowWriter.start(); + } + writeLongListColumn(rowWriter, fieldName, tempLongList); + if(!isMetadataQuery) { + rowWriter.save(); + } + } + break; + case MAP: + //try { // TODO Auto flatten compound data type + getAndMapCompoundData(datapath, new ArrayList<>(), this.HDF5reader, rowWriter); + //} catch (Exception e) { + // throw UserException + // .dataWriteError() + // .addContext("Error writing Compound Field: ") + // .addContext(e.getMessage()) + // .build(logger); + //} + break; + default: + //Case for data types that cannot be read + logger.warn(dsInfo.getTypeInformation().tryGetJavaType() + " not implemented....yet."); + break; + } + } else if (dimensions.length == 2) { + long cols = dimensions[1]; + long rows = dimensions[0]; + switch (HDF5Utils.getDataType(dsInfo)) { + case INT: + int[][] colData = HDF5reader.readIntMatrix(datapath); + mapIntMatrixField(fieldName, colData, (int) cols, (int) rows, rowWriter); + break; + /*case FLOAT4: + float[][] floatData = HDF5reader.readFloatMatrix(datapath); + resultCount = mapFloatMatrixField(fieldName, floatData, (int) cols, (int) rows, map); + break; + case FLOAT8: + double[][] doubleData = HDF5reader.readDoubleMatrix(datapath); + resultCount = mapDoubleMatrixField(fieldName, doubleData, (int) cols, (int) rows, map); + break; + case BIGINT: + long[][] longData = HDF5reader.readLongMatrix(datapath); + resultCount = mapLongMatrixField(fieldName, longData, (int) cols, (int) rows, map); + break;*/ + default: + logger.info(HDF5Utils.getDataType(dsInfo) + " not implemented."); + break; + } + }/* else { + // Case for data sets with dimensions > 2 + long cols = dimensions[1]; + long rows = dimensions[0]; + switch (HDF5Utils.getDataType(dsInfo)) { + case INT: + int[][] colData = HDF5reader.int32().readMDArray(datapath).toMatrix(); + resultCount = mapIntMatrixField(fieldName, colData, (int) cols, (int) rows, map); + break; + case FLOAT4: + float[][] floatData = HDF5reader.float32().readMDArray(datapath).toMatrix(); + resultCount = mapFloatMatrixField(fieldName, floatData, (int) cols, (int) rows, map); + break; + case FLOAT8: + double[][] doubleData = HDF5reader.float64().readMDArray(datapath).toMatrix(); + resultCount = mapDoubleMatrixField(fieldName, doubleData, (int) cols, (int) rows, map); + break; + case BIGINT: + long[][] longData = HDF5reader.int64().readMDArray(datapath).toMatrix(); + resultCount = mapLongMatrixField(fieldName, longData, (int) cols, (int) rows, map); + break; + default: + logger.info(HDF5Utils.getDataType(dsInfo) + " not implemented."); + break; + }*/ + //} + //} + + return false; + } + + private void writeBooleanColumn(TupleWriter rowWriter, String name, int value) { + boolean bool_value = true; + if(value == 0) { + bool_value = false; + } + writeBooleanColumn(rowWriter, name, bool_value); Review comment: ``` writeBooleanColumn(rowWriter, name, value != 0); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services