paul-rogers commented on PR #2937: URL: https://github.com/apache/drill/pull/2937#issuecomment-2332920168
Thanks again for the many pointers and explanations. Much to my surprise, Drill does, indeed, seem to read the Parquet metadata at plan time. Your explanation makes sense: clearly we need the information for parallelization planning. The Parquet metadata cache simply bypasses the physical file reads if the cache is present. I traced the whole flow. It all looks good except at the last step, writing data to a vector. That will prevent any direct attempt to have the Parquet reader write to a vector other than the one that matches the Parquet type. That may still be OK. As you can tell, my comments are based on the general schema change case: the problems we've encountered over the years. But, I suspect this PR can address one very specific case: missing columns when the type is consistent across files where it does appear. ### Recap of the Parquet Planning Process The code is hard to follow statically. Instead, I ran the unit tests in the PR to get the dynamic view. As described above, the `ParquetGroupScan` does read the files by way of a number of builders, providers and associated classes. The actual reading is about here: `ParquetFileAndRowCountMetadata.getParquetFileMetadata_v4`. Here I will insert a grumble that if Drill scans the Parquet files per query to get schema, then it could certainly do so for all other file types and avoid a large number of problems. Files have a path and a timestamp: we could easily build up a per-Drillbit cache of metadata so we read it only once per session, and only if the file changes. Trying to distribute that data is the hard part. (Everyone wants to use ZK, but ZK is not a distributed DB.) End of grumble. So, we do have schema per row group. We can thus work out the common schema as is done in this PR. In general, we have to handle the following: * Missing columns * Different types * Different cardinalities (nullable, non-nullable, repeated) The code here handles missing columns and the nullable/non-nullable modes. We can worry about the scalar/array issue later as only a novice would change a column from one to the other. Because of the use of the common type code, I assumed that this PR might also handle the differing types issue, but maybe I misunderstood. As explained above, the _right_ way to handle columns is to run them through Calcite. That way, Calcite can check for something like `VARCHAR * INT`, which SQL doesn't support. I can't tell if the original author did that work. Without the Calcite change, the readers will know the type, but not the SQL planner. Not great, but we can live with it since that's the way its worked until now. Thanks for adding tests that show that the type inference works as intended. It would be handy if `DESCRIBE <table>` were to work in this case. I didn't check if that is wired up for Parquet files. There are three cases that work prior to this PR. Assume we have a column `c`: * `c` appears in no files. The type is untyped NULL. (Nullable `INT` in other readers originally, changed with EVF.) * `c` appears in all files with the same type and mode. Works for all operators. The user sees a consistent column type. * `c`' changes type or mode, or is missing in some files. Works only if the query does not contain a SORT, JOIN or other type-sensitive operator. Presents the user with results with diffing types. This PR generalizes the third case. As it is, the "sensitive" operators will raise a schema change exception. To fix the SCEs we need to handle all the cases in which the schema can change so that all readers return the same schema. Cases: 1. `c` appears in all files with the same type but differing modes. 2. `c` appears in all files, but with differing types. 3. `c` appears in all files, but with differing types and modes. 4. `c` is missing in some files, but when it appears, it always has the same type. Mode must be set to nullable. Handled by this PR. 5. `c` is missing in some files. In the files in which `c` does appear, it has differing types. Handled by this PR, maybe. A review response comment suggests that this PR fixes cases 4, which seems like the most reasonable form of schema change. Changing column types is just asking for trouble with tools such as Presto or Impala. Still, let's think through all the cases. There is one additional case, just to make this more fun. The query can be of the form `SELECT * FROM myParquet`. In this case, the user might expect the union of all columns from all files and row groups, with the types reconciled for all of them. If we focus just on case 4, then the user would expect us to "fill in the blanks" if some of the columns are missing in some files. Once we have the common schema, we need to ship it to the distributed readers. It looks like `AbstractParquetRowGroupScan.schema` provides the "physical plan" support. The `AbstractParquetScanBatchCreator` class creates a set of `ParquetRecordReader` instances from the `AbstractParquetRowGroupScan` and wraps them in the `ScanBatch` operator. The schema is placed in a `ColumnExplorer`. As it turns out, Drill has not just one, but two different Parquet readers: `DrillParquetReader` and `ParquetRecordReader`. Both receive the list of columns, but only as a `List<SchemaPath>` which has no type information. `ParquetRecordReader` also receives the type information. It seems that the names are redundant: they appear in both the schema path and the `TupleSchema`. This may be the source of one of the bugs fixed in this PR. So far so good. ### The Problem But, now all heck breaks loose. What we want is for each row group to honor the planner-provided schema. But, this won't work. What we want is that the row group either has column `c` or not. If not, we make up a column of the type provided by the `TupleSchema`. There is much code to work out the Drill type for each Parquet type. That code should no longer be used by the _reader_, but rather by the planner. At read time, we just create the vector specified by the schema to ensure consistency. But, the Parquet column readers assume that the type of the Drill vector is the same as that of Parquet columns. As it turns out, there are two of sets of Parquet column writers: the original ones and a second set of "bulk" readers. I don't know if both are still used. Eventually, the Parquet column readers call down to the following line from `FixedByteAlignedReader`: ```java protected void writeData() { vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength); } ``` That is, the Drill vectors have been carefully chosen so that their byte layout of the vector matches the byte layout of Parquet. We read, say, `INT32` values into `INT` vectors where the values are also 32 bits long. We must also assume the same endian layout, IEEE floating point layout, etc. To write to a different Drill type requires a per-value conversion. The class `ColumnReaderFactory` chooses the column reader. It has no code to handle type conversions. To do a type conversion, we'd need a new set of readers: one for each (from type, to type) pair. EVF provides a large set of these so that EVF can perform this type coercion automatically for the common cases. Examples. read a CSV file (all VARCHAR), but into an `INT` column (requires a `VARCHAR`-to-`INT` conversion). This code is complex, so I might be missing something. More testing would reveal the facts. It could be that writing a non-nullable column into a nullable vector works: I'm just not sure what sets the null bits to the correct values. Maybe they take on the correct values by default, if we zero out the null vector at allocation. ### Handling Only Missing Columns Since the Parquet reader is not set up to handle type changes, we can choose to restrict this fix to handle only missing columns. That is, _if_ a column is missing, _and_ it has the same type when it does appear, _only then_ change the mode to nullable. Otherwise, let the schema change happen and the query will fail as today. Such a fix solves the issue for your case, but does not solve the general problem. This is still an improvement and worth doing. We just need to ensure that if Parquet thinks it is writing to a non-nullable vector, but the vector is nullable, that the code "does the right thing." Nullable vectors are a composite: they have a null vector and a data vector. Parquet should use the data vector. We just need to ensure that we somehow set the null vector bits correctly. It may be that vector allocation zeros the vectors and so the null vector starts in the proper state. Since the tests pass, this may be the case. ### Alternative Solution for Type Changes Suppose we wanted to handle type changes as well. From the above description, we can't solve the problem by forcing Parquet to choose a different type of vector than the one for the column type. (I mentioned that this is possible when using EVF and suggested we could do the same for Parquet. It turns out that Parquet will not allow this solution.) There are two possible solutions, one very hard, the other only hard. (The third, of course, is to skip this task.) The first option is to create a type conversion mechanism as described above. These are tedious to write (we'd use code generation) and just as tedious to test. This seems an excessive amount of work to solve a problem which occurs infrequently. The second option is simply to alter the execution plan to insert a PROJECT operator on top of the SCAN operator. We could set a flag that says that the PROJECT is needed. This flag would indicate that we found a type inconsistency. The PROJECT operator (`ProjectRecordBatch`), will handle type conversions: it is what implements `CAST()` operations. The trick is that adding the PROJECT is, itself, complex. Basically, there needs to be a Calcite rule that rewrites a scan batch with our flag set to a project with the needed `CAST` operations. The Parquet reader should then ignore the `TupleSchema` and create vectors based on the Parquet schema. We would want to verify that the PROJECT does what it is supposed to do: emit the same vectors even when it receives a schema change from its upstream (SCAN) operator. ### Suggestion Given the above facts (which you should certainly) verify, I suggest we choose the "Handling Only Missing Columns" solution and not attempt to handle column type changes. That is, don't attempt to fix the cases 3-5 (differing types) in this PR. Leave that as a future exercise. If you found this bug due to an actual use case. ensure that the use case does not change column data types. If the type of a column is to change, just create a new column name instead. For example, if `IP_addr` is an `INT32`, don't try to make it an `INT64` for IPV6 addresses. Instead, write those to a new `IP6_addr` column. With this fix, adding and removing columns should work as expected. Does this seem a reasonable approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org