paul-rogers commented on PR #2937:
URL: https://github.com/apache/drill/pull/2937#issuecomment-2332920168

   Thanks again for the many pointers and explanations. Much to my surprise, 
Drill does, indeed, seem to read the Parquet metadata at plan time. Your 
explanation makes sense: clearly we need the information for parallelization 
planning. The Parquet metadata cache simply bypasses the physical file reads if 
the cache is present.
   
   I traced the whole flow. It all looks good except at the last step, writing 
data to a vector. That will prevent any direct attempt to have the Parquet 
reader write to a vector other than the one that matches the Parquet type.
   
   That may still be OK. As you can tell, my comments are based on the general 
schema change case: the problems we've encountered over the years. But, I 
suspect this PR can address one very specific case: missing columns when the 
type is consistent across files where it does appear.
   
   ### Recap of the Parquet Planning Process
   
   The code is hard to follow statically. Instead, I ran the unit tests in the 
PR to get the dynamic view. As described above, the `ParquetGroupScan` does 
read the files by way of a number of builders, providers and associated 
classes. The actual reading is about here: 
`ParquetFileAndRowCountMetadata.getParquetFileMetadata_v4`.
   
   Here I will insert a grumble that if Drill scans the Parquet files per query 
to get schema, then it could certainly do so for all other file types and avoid 
a large number of problems. Files have a path and a timestamp: we could easily 
build up a per-Drillbit cache of metadata so we read it only once per session, 
and only if the file changes. Trying to distribute that data is the hard part. 
(Everyone wants to use ZK, but ZK is not a distributed DB.) End of grumble.
   
   So, we do have schema per row group. We can thus work out the common schema 
as is done in this PR. In general, we have to handle the following:
   
   * Missing columns
   * Different types
   * Different cardinalities (nullable, non-nullable, repeated)
   
   The code here handles missing columns and the nullable/non-nullable modes. 
We can worry about the scalar/array issue later as only a novice would change a 
column from one to the other. Because of the use of the common type code, I 
assumed that this PR might also handle the differing types issue, but maybe I 
misunderstood.
   
   As explained above, the _right_ way to handle columns is to run them through 
Calcite. That way, Calcite can check for something like `VARCHAR * INT`, which 
SQL doesn't support. I can't tell if the original author did that work. Without 
the Calcite change, the readers will know the type, but not the SQL planner. 
Not great, but we can live with it since that's the way its worked until now.
   
   Thanks for adding tests that show that the type inference works as intended. 
It would be handy if  `DESCRIBE <table>` were to work in this case. I didn't 
check if that is wired up for Parquet files.
   
   There are three cases that work prior to this PR. Assume we have a column 
`c`:
   
   * `c` appears in no files. The type is untyped NULL. (Nullable `INT` in 
other readers originally, changed with EVF.)
   * `c` appears in all files with the same type and mode. Works for all 
operators. The user sees a consistent column type.
   * `c`' changes type or mode, or is missing in some files. Works only if the 
query does not contain a SORT, JOIN or other type-sensitive operator. Presents 
the user with results with diffing types.
   
   This PR generalizes the third case. As it is, the "sensitive" operators will 
raise a schema change exception. To fix the SCEs we need to handle all the 
cases in which the schema can change so that all readers return the same 
schema. Cases:
   
   1. `c` appears in all files with the same type but differing modes.
   2. `c` appears in all files, but with differing types.
   3. `c` appears in all files, but with differing types and modes.
   4. `c` is missing in some files, but when it appears, it always has the same 
type. Mode must be set to nullable. Handled by this PR.
   5. `c` is missing in some files. In the files in which `c` does appear, it 
has differing types. Handled by this PR, maybe.
   
   A review response comment suggests that this PR fixes cases 4, which seems 
like the most reasonable form of schema change. Changing column types is just 
asking for trouble with tools such as Presto or Impala. Still, let's think 
through all the cases.
   
   There is one additional case, just to make this more fun. The query can be 
of the form `SELECT * FROM myParquet`. In this case, the user might expect the 
union of all columns from all files and row groups, with the types reconciled 
for all of them. If we focus just on case 4, then the user would expect us to 
"fill in the blanks" if some of the columns are missing in some files.
   
   Once we have the common schema, we need to ship it to the distributed 
readers. It looks like `AbstractParquetRowGroupScan.schema` provides the 
"physical plan" support. The `AbstractParquetScanBatchCreator` class creates a 
set of `ParquetRecordReader` instances from the `AbstractParquetRowGroupScan` 
and wraps them in the `ScanBatch` operator. The schema is placed in a 
`ColumnExplorer`.
   
   As it turns out, Drill has not just one, but two different Parquet readers: 
`DrillParquetReader` and `ParquetRecordReader`. Both receive the list of 
columns, but only as a `List<SchemaPath>` which has no type information. 
`ParquetRecordReader` also receives the type information. It seems that the 
names are redundant: they appear in both the schema path and the `TupleSchema`. 
This may be the source of one of the bugs fixed in this PR.
   
   So far so good.
   
   ### The Problem
   
   But, now all heck breaks loose. What we want is for each row group to honor 
the planner-provided schema. But, this won't work.
    
   What we want is that the row group either has column `c` or not. If not, we 
make up a column of the type provided by the `TupleSchema`. There is much code 
to work out the Drill type for each Parquet type. That code should no longer be 
used by the _reader_, but rather by the planner. At read time, we just create 
the vector specified by the schema to ensure consistency.
   
   But, the Parquet column readers assume that the type of the Drill vector is 
the same as that of Parquet columns. As it turns out, there are two of sets of 
Parquet column writers: the original ones and a second set of "bulk" readers. I 
don't know if both are still used. Eventually, the Parquet column readers call 
down to the following line from `FixedByteAlignedReader`:
   
   ```java
     protected void writeData() {
       vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
     }
   ```
   
   That is, the Drill vectors have been carefully chosen so that their byte 
layout of the vector matches the byte layout of Parquet. We read, say, `INT32`  
values into `INT` vectors where the values are also 32 bits long. We must also 
assume the same endian layout, IEEE floating point layout, etc. To write to a 
different Drill type requires a per-value conversion.
   
   The class `ColumnReaderFactory` chooses the column reader. It has no code to 
handle type conversions. To do a type conversion, we'd need a new set of 
readers: one for each (from type, to type) pair. EVF provides a large set of 
these so that EVF can perform this type coercion automatically for the common 
cases. Examples. read a CSV file (all VARCHAR), but into an `INT` column 
(requires a `VARCHAR`-to-`INT` conversion).
   
   This code is complex, so I might be missing something. More testing would 
reveal the facts. It could be that writing a non-nullable column into a 
nullable vector works: I'm just not sure what sets the null bits to the correct 
values. Maybe they take on the correct values by default, if we zero out the 
null vector at allocation.
   
   ### Handling Only Missing Columns
   
   Since the Parquet reader is not set up to handle type changes, we can choose 
to restrict this fix to handle only missing columns. That is, _if_ a column is 
missing, _and_ it has the same type when it does appear, _only then_ change the 
mode to nullable. Otherwise, let the schema change happen and the query will 
fail as today. Such a fix solves the issue for your case, but does not solve 
the general problem. This is still an improvement and worth doing.
   
   We just need to ensure that if Parquet thinks it is writing to a 
non-nullable vector, but the vector is nullable, that the code "does the right 
thing." Nullable vectors are a composite: they have a null vector and a data 
vector. Parquet should use the data vector. We just need to ensure that we 
somehow set the null vector bits correctly. It may be that vector allocation 
zeros the vectors and so the null vector starts in the proper state. Since the 
tests pass, this may be the case.
   
   ### Alternative Solution for Type Changes
   
   Suppose we wanted to handle type changes as well. From the above 
description, we can't solve the problem by forcing Parquet to choose a 
different type of vector than the one for the column type. (I mentioned that 
this is possible when using EVF and suggested we could do the same for Parquet. 
It turns out that Parquet will not allow this solution.)
   
   There are two possible solutions, one very hard, the other only hard. (The 
third, of course, is to skip this task.)
   
   The first option is to create a type conversion mechanism as described 
above. These are tedious to write (we'd use code generation) and just as 
tedious to test. This seems an excessive amount of work to solve a problem 
which occurs infrequently.
   
   The second option is simply to alter the execution plan to insert a PROJECT 
operator on top of the SCAN operator. We could set a flag that says that the 
PROJECT is needed. This flag would indicate that we found a type inconsistency. 
The PROJECT operator (`ProjectRecordBatch`), will handle type conversions: it 
is what implements `CAST()` operations.
   
   The trick is that adding the PROJECT is, itself, complex. Basically, there 
needs to be a Calcite rule that rewrites a scan batch with our flag set to a 
project with the needed `CAST` operations. The Parquet reader should then 
ignore the `TupleSchema` and create vectors based on the Parquet schema.
   
   We would want to verify that the PROJECT does what it is supposed to do: 
emit the same vectors even when it receives a schema change from its upstream 
(SCAN) operator.
   
   ### Suggestion
   
   Given the above facts (which you should certainly) verify, I suggest we 
choose the "Handling Only Missing Columns" solution and not attempt to handle 
column type changes. That is, don't attempt to fix the cases 3-5 (differing 
types) in this PR. Leave that as a future exercise. 
   
   If you found this bug due to an actual use case. ensure that the use case 
does not change column data types. If the type of a column is to change, just 
create a new column name instead. For example, if `IP_addr` is an `INT32`, 
don't try to make it an `INT64` for IPV6 addresses. Instead, write those to a 
new `IP6_addr` column. With this fix, adding and removing columns should work 
as expected.
   
   Does this seem a reasonable approach?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to