paul-rogers commented on PR #2937:
URL: https://github.com/apache/drill/pull/2937#issuecomment-2332920168
Thanks again for the many pointers and explanations. Much to my surprise,
Drill does, indeed, seem to read the Parquet metadata at plan time. Your
explanation makes sense: clearly we need the information for parallelization
planning. The Parquet metadata cache simply bypasses the physical file reads if
the cache is present.
I traced the whole flow. It all looks good except at the last step, writing
data to a vector. That will prevent any direct attempt to have the Parquet
reader write to a vector other than the one that matches the Parquet type.
That may still be OK. As you can tell, my comments are based on the general
schema change case: the problems we've encountered over the years. But, I
suspect this PR can address one very specific case: missing columns when the
type is consistent across files where it does appear.
### Recap of the Parquet Planning Process
The code is hard to follow statically. Instead, I ran the unit tests in the
PR to get the dynamic view. As described above, the `ParquetGroupScan` does
read the files by way of a number of builders, providers and associated
classes. The actual reading is about here:
`ParquetFileAndRowCountMetadata.getParquetFileMetadata_v4`.
Here I will insert a grumble that if Drill scans the Parquet files per query
to get schema, then it could certainly do so for all other file types and avoid
a large number of problems. Files have a path and a timestamp: we could easily
build up a per-Drillbit cache of metadata so we read it only once per session,
and only if the file changes. Trying to distribute that data is the hard part.
(Everyone wants to use ZK, but ZK is not a distributed DB.) End of grumble.
So, we do have schema per row group. We can thus work out the common schema
as is done in this PR. In general, we have to handle the following:
* Missing columns
* Different types
* Different cardinalities (nullable, non-nullable, repeated)
The code here handles missing columns and the nullable/non-nullable modes.
We can worry about the scalar/array issue later as only a novice would change a
column from one to the other. Because of the use of the common type code, I
assumed that this PR might also handle the differing types issue, but maybe I
misunderstood.
As explained above, the _right_ way to handle columns is to run them through
Calcite. That way, Calcite can check for something like `VARCHAR * INT`, which
SQL doesn't support. I can't tell if the original author did that work. Without
the Calcite change, the readers will know the type, but not the SQL planner.
Not great, but we can live with it since that's the way its worked until now.
Thanks for adding tests that show that the type inference works as intended.
It would be handy if `DESCRIBE <table>` were to work in this case. I didn't
check if that is wired up for Parquet files.
There are three cases that work prior to this PR. Assume we have a column
`c`:
* `c` appears in no files. The type is untyped NULL. (Nullable `INT` in
other readers originally, changed with EVF.)
* `c` appears in all files with the same type and mode. Works for all
operators. The user sees a consistent column type.
* `c`' changes type or mode, or is missing in some files. Works only if the
query does not contain a SORT, JOIN or other type-sensitive operator. Presents
the user with results with diffing types.
This PR generalizes the third case. As it is, the "sensitive" operators will
raise a schema change exception. To fix the SCEs we need to handle all the
cases in which the schema can change so that all readers return the same
schema. Cases:
1. `c` appears in all files with the same type but differing modes.
2. `c` appears in all files, but with differing types.
3. `c` appears in all files, but with differing types and modes.
4. `c` is missing in some files, but when it appears, it always has the same
type. Mode must be set to nullable. Handled by this PR.
5. `c` is missing in some files. In the files in which `c` does appear, it
has differing types. Handled by this PR, maybe.
A review response comment suggests that this PR fixes cases 4, which seems
like the most reasonable form of schema change. Changing column types is just
asking for trouble with tools such as Presto or Impala. Still, let's think
through all the cases.
There is one additional case, just to make this more fun. The query can be
of the form `SELECT * FROM myParquet`. In this case, the user might expect the
union of all columns from all files and row groups, with the types reconciled
for all of them. If we focus just on case 4, then the user would expect us to
"fill in the blanks" if some of the columns are missing in some files.
Once we have the common schema, we need to ship it to the distributed
readers. It looks like `AbstractParquetRowGroupScan.schema` provides the
"physical plan" support. The `AbstractParquetScanBatchCreator` class creates a
set of `ParquetRecordReader` instances from the `AbstractParquetRowGroupScan`
and wraps them in the `ScanBatch` operator. The schema is placed in a
`ColumnExplorer`.
As it turns out, Drill has not just one, but two different Parquet readers:
`DrillParquetReader` and `ParquetRecordReader`. Both receive the list of
columns, but only as a `List<SchemaPath>` which has no type information.
`ParquetRecordReader` also receives the type information. It seems that the
names are redundant: they appear in both the schema path and the `TupleSchema`.
This may be the source of one of the bugs fixed in this PR.
So far so good.
### The Problem
But, now all heck breaks loose. What we want is for each row group to honor
the planner-provided schema. But, this won't work.
What we want is that the row group either has column `c` or not. If not, we
make up a column of the type provided by the `TupleSchema`. There is much code
to work out the Drill type for each Parquet type. That code should no longer be
used by the _reader_, but rather by the planner. At read time, we just create
the vector specified by the schema to ensure consistency.
But, the Parquet column readers assume that the type of the Drill vector is
the same as that of Parquet columns. As it turns out, there are two of sets of
Parquet column writers: the original ones and a second set of "bulk" readers. I
don't know if both are still used. Eventually, the Parquet column readers call
down to the following line from `FixedByteAlignedReader`:
```java
protected void writeData() {
vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
}
```
That is, the Drill vectors have been carefully chosen so that their byte
layout of the vector matches the byte layout of Parquet. We read, say, `INT32`
values into `INT` vectors where the values are also 32 bits long. We must also
assume the same endian layout, IEEE floating point layout, etc. To write to a
different Drill type requires a per-value conversion.
The class `ColumnReaderFactory` chooses the column reader. It has no code to
handle type conversions. To do a type conversion, we'd need a new set of
readers: one for each (from type, to type) pair. EVF provides a large set of
these so that EVF can perform this type coercion automatically for the common
cases. Examples. read a CSV file (all VARCHAR), but into an `INT` column
(requires a `VARCHAR`-to-`INT` conversion).
This code is complex, so I might be missing something. More testing would
reveal the facts. It could be that writing a non-nullable column into a
nullable vector works: I'm just not sure what sets the null bits to the correct
values. Maybe they take on the correct values by default, if we zero out the
null vector at allocation.
### Handling Only Missing Columns
Since the Parquet reader is not set up to handle type changes, we can choose
to restrict this fix to handle only missing columns. That is, _if_ a column is
missing, _and_ it has the same type when it does appear, _only then_ change the
mode to nullable. Otherwise, let the schema change happen and the query will
fail as today. Such a fix solves the issue for your case, but does not solve
the general problem. This is still an improvement and worth doing.
We just need to ensure that if Parquet thinks it is writing to a
non-nullable vector, but the vector is nullable, that the code "does the right
thing." Nullable vectors are a composite: they have a null vector and a data
vector. Parquet should use the data vector. We just need to ensure that we
somehow set the null vector bits correctly. It may be that vector allocation
zeros the vectors and so the null vector starts in the proper state. Since the
tests pass, this may be the case.
### Alternative Solution for Type Changes
Suppose we wanted to handle type changes as well. From the above
description, we can't solve the problem by forcing Parquet to choose a
different type of vector than the one for the column type. (I mentioned that
this is possible when using EVF and suggested we could do the same for Parquet.
It turns out that Parquet will not allow this solution.)
There are two possible solutions, one very hard, the other only hard. (The
third, of course, is to skip this task.)
The first option is to create a type conversion mechanism as described
above. These are tedious to write (we'd use code generation) and just as
tedious to test. This seems an excessive amount of work to solve a problem
which occurs infrequently.
The second option is simply to alter the execution plan to insert a PROJECT
operator on top of the SCAN operator. We could set a flag that says that the
PROJECT is needed. This flag would indicate that we found a type inconsistency.
The PROJECT operator (`ProjectRecordBatch`), will handle type conversions: it
is what implements `CAST()` operations.
The trick is that adding the PROJECT is, itself, complex. Basically, there
needs to be a Calcite rule that rewrites a scan batch with our flag set to a
project with the needed `CAST` operations. The Parquet reader should then
ignore the `TupleSchema` and create vectors based on the Parquet schema.
We would want to verify that the PROJECT does what it is supposed to do:
emit the same vectors even when it receives a schema change from its upstream
(SCAN) operator.
### Suggestion
Given the above facts (which you should certainly) verify, I suggest we
choose the "Handling Only Missing Columns" solution and not attempt to handle
column type changes. That is, don't attempt to fix the cases 3-5 (differing
types) in this PR. Leave that as a future exercise.
If you found this bug due to an actual use case. ensure that the use case
does not change column data types. If the type of a column is to change, just
create a new column name instead. For example, if `IP_addr` is an `INT32`,
don't try to make it an `INT64` for IPV6 addresses. Instead, write those to a
new `IP6_addr` column. With this fix, adding and removing columns should work
as expected.
Does this seem a reasonable approach?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]