[jira] [Commented] (DRILL-8188) Convert HDF5 format to EVF2

2022-04-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-8188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524062#comment-17524062
 ] 

ASF GitHub Bot commented on DRILL-8188:
---

paul-rogers commented on PR #2515:
URL: https://github.com/apache/drill/pull/2515#issuecomment-1102092604

   This PR is getting a bit complex with the bug or two that this PR uncovered. 
Iexplain a bit about how EVF2 works. There are two case: wildcard projection 
(SELECT *) and explicit projection (SELECT a, b, c). The way EVF2 works is 
different in these two cases.
   
   Then, for each reader, there are three other cases. The reader might know 
all its columns before the file is even opened. The PCAP reader is an example: 
all PCAP files have the same schema, so we don't need to look at the file to 
know the schema. The second case are files were we can learn the schema when 
opening the file. Parquet and CSV are examples: we can learn the Parquet schema 
from the file metadata, and CSV schema from the headers. The last case is where 
we don't know the schema until we read each row. JSON is the best example.
   
   So, now we have six cases to consider. This is why EVF2 is so complex!
   
   For the wildcard, EVF2 "discovers" columns as the reader creates them: 
either via the up-front schema, or as the reader reads data. In JSON, for 
example, we can discover a new column at any time. Once a column is added, EVF2 
will automatically fill in null values if values are missing. In the extreme 
case, it can fill in nulls for an entire batch. Because of the wildcard, all 
discovered columns are materialized and added to the result set. If reading 
JSON, and a column does not appear until the third batch, then the first two 
won't contain that column, but the third batch will have a schema change and 
will include the column. This can cause a problem for operators such as joins, 
sort or aggregation that have to store a collection of rows, not all can handle 
a schema change.
   
   Now, for the explicit schema case, EVF2 knows what columns the user wants: 
those in the list. EVF2 waits as long as it can, hoping the reader will provide 
the columns. Again, the reader can provide them up front, before the first 
record, or as the read proceeds (as in JSON.) As the reader provides each 
column, EVF2 has to decide: do we need that column? If so, we create a vector 
and a column writer: we materialize the column. If the column is not needed, 
EVF2 creates a dummy column writer.
   Now the interesting part. Suppose we get to the end of the first batch, the 
query wants column c, and the reader has never defined column c? What do we do? 
In this case, we have to make something up. Historically, Drill would make up a 
Nullable Int, with all-null values. EVF added the ability to specify the type 
for such columns, and we use that. If a provided schema is available, then the 
user tells us the type.
   
   Now we get to another interesting part. What if we guessed, say, Varchar, 
but the column later shows up as a JSON array? We're stuck: we can't go back 
and redo the old batches. We end up with a "hard" schema change. Bad things 
happen unless the query is really simple. This is the fun of Drill's schemaless 
system.
   
   With that background, we can try to answer your question. The answer is: it 
depends. If the reader says, "hey Mr. EVF2, here is the full schema I will 
read, I promise not to discover more columns", then EVF2 will throw an 
exception if later you say, "ha! just kidding. Actually, I discovered another 
one." I wonder if that's what is happening here.
   
   If, however, the reader left the schema open, and said, "here are the 
columns I know about now, but I might find more later", then EVF2 will expect 
more columns, and will handle them as above: materialize them if they are 
projected or if we have a wildcard, provide a dummy writer if we have explicit 
projection and the column is not projected.
   
   In this PR, we have two separate cases in the reader constructor.
   
   * In the `if `path, we define a "reader schema", and reserve the right to 
add more columns later. "That's what the `false` argument means to 
`tableSchema()`.
   * In the `else` path, we define no schema at all: we don't all 
`tableSchema()`.
   
   This means the reader is doing two entirely different things. In the `if` 
case, we define the schema and we just ask for column writers by name. In the 
`else` case, we don't define a schema, and we have to define the column when we 
ask for the column writers.
   
   This seems horribly complicated! I wonder, are we missing logic in the 
`then` case? Or, should there be two distinct readers, each of which implements 
one of the above cases?




> Convert HDF5 format to EVF2
> ---
>
> Key: DRILL-8188
> URL: https://issues.apache.org/jira/browse/DRILL-8188
> Project: 

[jira] [Commented] (DRILL-8188) Convert HDF5 format to EVF2

2022-04-18 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-8188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524061#comment-17524061
 ] 

ASF GitHub Bot commented on DRILL-8188:
---

paul-rogers commented on code in PR #2515:
URL: https://github.com/apache/drill/pull/2515#discussion_r852601292


##
contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5BatchReader.java:
##
@@ -171,107 +164,104 @@ public HDF5ReaderConfig(HDF5FormatPlugin plugin, 
HDF5FormatConfig formatConfig)
 }
   }
 
-  public HDF5BatchReader(HDF5ReaderConfig readerConfig, int maxRecords) {
-this.readerConfig = readerConfig;
-this.maxRecords = maxRecords;
+  public HDF5BatchReader(HDF5ReaderConfig config, EasySubScan scan, 
FileSchemaNegotiator negotiator) {
+errorContext = negotiator.parentErrorContext();
+file = negotiator.file();
+readerConfig = config;
 dataWriters = new ArrayList<>();
-this.showMetadataPreview = readerConfig.formatConfig.showPreview();
-  }
+showMetadataPreview = readerConfig.formatConfig.showPreview();
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-split = negotiator.split();
-errorContext = negotiator.parentErrorContext();
 // Since the HDF file reader uses a stream to actually read the file, the 
file name from the
 // module is incorrect.
-fileName = split.getPath().getName();
-try {
-  openFile(negotiator);
-} catch (IOException e) {
-  throw UserException
-.dataReadError(e)
-.addContext("Failed to close input file: %s", split.getPath())
-.addContext(errorContext)
-.build(logger);
+fileName = file.split().getPath().getName();
+
+{ // Opens an HDF5 file
+  try (InputStream in = 
file.fileSystem().openPossiblyCompressedStream(file.split().getPath())) {
+/*
+ * As a possible future improvement, the jhdf reader has the ability 
to read hdf5 files from
+ * a byte array or byte buffer. This implementation is better in that 
it does not require creating
+ * a temporary file which must be deleted later.  However, it could 
result in memory issues in the
+ * event of large files.
+ */
+hdfFile = HdfFile.fromInputStream(in);
+  } catch (IOException e) {
+throw UserException
+  .dataReadError(e)
+  .message("Failed to open input file: %s", file.split().getPath())
+  .addContext(errorContext)
+  .build(logger);
+  }
 }
 
-ResultSetLoader loader;
-if (readerConfig.defaultPath == null) {
-  // Get file metadata
-  List metadata = getFileMetadata(hdfFile, new 
ArrayList<>());
-  metadataIterator = metadata.iterator();
-
-  // Schema for Metadata query
-  SchemaBuilder builder = new SchemaBuilder()
-.addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
-.addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
-.addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
-.addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
-.addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
-.addNullable(ELEMENT_COUNT_NAME, MinorType.BIGINT)
-.addNullable(DATASET_DATA_TYPE_NAME, MinorType.VARCHAR)
-.addNullable(DIMENSIONS_FIELD_NAME, MinorType.VARCHAR);
-
-  negotiator.tableSchema(builder.buildSchema(), false);
-
-  loader = negotiator.build();
-  dimensions = new int[0];
-  rowWriter = loader.writer();
-
-} else {
-  // This is the case when the default path is specified. Since the user 
is explicitly asking for a dataset
-  // Drill can obtain the schema by getting the datatypes below and 
ultimately mapping that schema to columns
-  Dataset dataSet = hdfFile.getDatasetByPath(readerConfig.defaultPath);
-  dimensions = dataSet.getDimensions();
-
-  loader = negotiator.build();
-  rowWriter = loader.writer();
-  writerSpec = new WriterSpec(rowWriter, negotiator.providedSchema(),
-  negotiator.parentErrorContext());
-  if (dimensions.length <= 1) {
-buildSchemaFor1DimensionalDataset(dataSet);
-  } else if (dimensions.length == 2) {
-buildSchemaFor2DimensionalDataset(dataSet);
+{ // Build the schema and initial the writer
+  ResultSetLoader loader;
+  if (readerConfig.defaultPath == null) {
+// Get file metadata
+List metadata = getFileMetadata(hdfFile, new 
ArrayList<>());
+metadataIterator = metadata.iterator();
+
+// Schema for Metadata query
+SchemaBuilder builder = new SchemaBuilder()
+  .addNullable(PATH_COLUMN_NAME, MinorType.VARCHAR)
+  .addNullable(DATA_TYPE_COLUMN_NAME, MinorType.VARCHAR)
+  .addNullable(FILE_NAME_COLUMN_NAME, MinorType.VARCHAR)
+  .addNullable(DATA_SIZE_COLUMN_NAME, MinorType.BIGINT)
+  .addNullable(IS_LINK_COLUMN_NAME, MinorType.BIT)
+