cgivre commented on a change in pull request #1778: Drill-7233: Format Plugin for HDF5 URL: https://github.com/apache/drill/pull/1778#discussion_r345949293
########## File path: contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java ########## @@ -0,0 +1,1210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.hdf5; + +import ch.systemsx.cisd.hdf5.HDF5CompoundMemberInformation; +import ch.systemsx.cisd.hdf5.HDF5DataClass; +import ch.systemsx.cisd.hdf5.HDF5DataSetInformation; +import ch.systemsx.cisd.hdf5.HDF5FactoryProvider; +import ch.systemsx.cisd.hdf5.HDF5LinkInformation; +import ch.systemsx.cisd.hdf5.IHDF5Factory; +import ch.systemsx.cisd.hdf5.IHDF5Reader; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator; +import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.MapBuilder; +import org.apache.drill.exec.record.metadata.MetadataUtils; +import org.apache.drill.exec.record.metadata.SchemaBuilder; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.store.hdf5.writers.HDF5DataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5DoubleDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5EnumDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5FloatDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5IntDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5LongDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5MapDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5StringDataWriter; +import org.apache.drill.exec.store.hdf5.writers.HDF5TimestampDataWriter; +import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter; + +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.drill.shaded.guava.com.google.common.io.Files; +import org.apache.hadoop.mapred.FileSplit; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.BitSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class HDF5BatchReader implements ManagedReader<FileSchemaNegotiator> { + private static final Logger logger = LoggerFactory.getLogger(HDF5BatchReader.class); + + private static final String PATH_COLUMN_NAME = "path"; + + private static final String DATA_TYPE_COLUMN_NAME = "data_type"; + + private static final String FILE_NAME_COLUMN_NAME = "file_name"; + + private static final String INT_COLUMN_PREFIX = "int_col_"; + + private static final String LONG_COLUMN_PREFIX = "long_col_"; + + private static final String FLOAT_COLUMN_PREFIX = "float_col_"; + + private static final String DOUBLE_COLUMN_PREFIX = "double_col_"; + + private static final String INT_COLUMN_NAME = "int_data"; + + private static final String FLOAT_COLUMN_NAME = "float_data"; + + private static final String DOUBLE_COLUMN_NAME = "double_data"; + + private static final String LONG_COLUMN_NAME = "long_data"; + + private static final String STRING_COLUMN_NAME = "string_data"; + + private static final String BOOLEAN_COLUMN_NAME = "boolean_data"; + + private FileSplit split; + + private HDF5FormatConfig formatConfig; + + private ResultSetLoader loader; + + private String tempFileName; + + private IHDF5Reader hdf5Reader; + + private File infile; + + private SchemaBuilder builder; + + private BufferedReader reader; + + private RowSetLoader rowWriter; + + private Iterator<HDF5DrillMetadata> metadataIterator; + + private final HDF5ReaderConfig readerConfig; + + private List<HDF5DrillMetadata> metadata; + + private boolean deleteTmpDir; + + private ScalarWriter pathWriter; + + private ScalarWriter dataTypeWriter; + + private ScalarWriter fileNameWriter; + + private List<HDF5DataWriter> dataWriters; + + private HDF5DataSetInformation dsInfo; + + private HDF5DataClass dataType; + + private long[] dimensions; + + public static class HDF5ReaderConfig { + final HDF5FormatPlugin plugin; + + final String defaultPath; + + final HDF5FormatConfig formatConfig; + + public HDF5ReaderConfig(HDF5FormatPlugin plugin, HDF5FormatConfig formatConfig) { + this.plugin = plugin; + this.formatConfig = formatConfig; + defaultPath = formatConfig.getDefaultPath(); + } + } + + + public HDF5BatchReader(HDF5ReaderConfig readerConfig) { + this.readerConfig = readerConfig; + formatConfig = readerConfig.formatConfig; + dataWriters = new ArrayList<>(); + } + + @Override + public boolean open(FileSchemaNegotiator negotiator) { + split = negotiator.split(); + openFile(negotiator); + + boolean schemaComplete = false; + if (readerConfig.defaultPath == null) { + // Get file metadata + metadata = getFileMetadata(hdf5Reader.object().getGroupMemberInformation("/", true), new ArrayList<>()); + metadataIterator = metadata.iterator(); + + // Schema for Metadata query + builder = new SchemaBuilder() + .addNullable(PATH_COLUMN_NAME, TypeProtos.MinorType.VARCHAR) + .addNullable(DATA_TYPE_COLUMN_NAME, TypeProtos.MinorType.VARCHAR) + .addNullable(FILE_NAME_COLUMN_NAME, TypeProtos.MinorType.VARCHAR); + negotiator.setTableSchema(builder.buildSchema(), schemaComplete); Review comment: Fixed ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
